我有一个Django项目,我正在尝试使用Celery提交任务进行后台处理(http://ask.github.com/celery/introduction.html)。Celery与Django集成良好,我已经能够提交自定义任务并获得结果。
唯一的问题是,我找不到在守护进程中执行自定义初始化的理智方法。在开始处理任务之前,我需要调用一个昂贵的函数,该函数会加载大量内存,而且我无力每次都调用该函数。
有人遇到过这个问题吗?有什么想法如何解决它而不修改Celery源代码?
谢谢
您可以编写自定义加载程序,也可以使用信号。
加载程序具有该on_task_init
方法,该方法将在要执行任务时调用,该方法on_worker_init
由celery +
celerybeat主进程调用。
使用信号可能是最简单的,可用的信号是:
0.8.x:
task_prerun(task_id, task, args, kwargs)
在工作人员即将执行任务时(或在使用apply
/或CELERY_ALWAYS_EAGER
已设置的情况下,在本地)执行任务时调度。
task_postrun(task_id, task, args, kwargs, retval)
在与上述相同的条件下执行任务后调度。
task_sent(task_id, task, args, kwargs, eta, taskset)
应用任务时调用(不适用于长时间运行的操作)
0.9.x中可用的其他信号(github上的当前主分支):
worker_init()
在celeryd启动时调用(在初始化任务之前,因此,如果在支持的系统上fork
,则任何内存更改都将复制到子worker进程)。
worker_ready()
在celeryd能够接收任务时调用。
worker_shutdown()
在celeryd关闭时调用。
这是一个示例,该示例在流程中第一次运行任务时进行预先计算:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
如果要对所有任务运行该函数,只需跳过sender
参数。
问题内容: 我想将我的python软件包设为“ pip installable”。问题在于,程序包具有必须来自用户的初始Shell脚本(例如)的Shell脚本。 但是在安装之后,用户并不完全知道脚本的去向(大概是,但是我们不能保证)。当然,用户可以运行并手动编辑其初始化脚本。 但我想使这一步骤自动化。我可以创建一个新的distutils命令,但不调用它。而且我可以扩展,但是安装会通过pip中断(尽
问题内容: 好像我遇到了本不应该出现的问题……但我想寻求帮助。 这里有一些我没有得到的解释。 具有两个简单的类,其中一个引用另一个,如下所示; 我收到注释的编译错误。有人可以告诉我该怎么办吗? 非常感谢任何好人的帮助! 问题答案: 正如 vadian 正确指出的 那样, 您应该在以下情况下创建一个: 您不能为 依赖 于另一个实例属性的存储属性提供默认值。
问题内容: 我正在使用django和celery(django-celery)进行项目。我们的团队决定将所有数据访问代码包装在其中(不要像django这样包装到Manager中),而将代码放入(应用程序名称)中,仅处理用celery组装和执行任务(因此我们没有django在这一层的ORM依赖性)。 在我的,我有这样的事情: 在我的中,我喜欢将它们包装成任务(然后可以使用这些任务来完成更复杂的任务)
类似于这个问题:如何在Jooq中初始化和创建ResultSet和记录?但使用自定义行类型记录而不是简单的表记录。我正在实例化一个jooq记录以用于模拟,但该记录有超过22列并包含来自许多连接表的行,所以我使用RecordImpl。 这引发异常 java.lang.IllegalArgumentException:行()中不包含字段(“course\u id”) 注意,我没有直接使用RecordIm
问题内容: 我使用celery更新新闻聚合站点中的RSS feed。我为每个提要使用一个@task,看起来一切正常。 有一个细节我不确定如何处理:所有提要每分钟都使用@periodic_task更新一次,但是如果提要仍在启动新任务时从上一个定期任务更新,该怎么办?(例如,如果Feed确实很慢或离线,并且任务在重试循环中进行) 目前,我存储任务结果并按以下方式检查其状态: 也许我错过了一些使用芹菜机
我写了这个计数器,但当我午餐时,应用程序崩溃了,我现在不知道错误在哪里,请提供一些帮助,谢谢你告诉我错误在哪里,或者我如何以正确的方式编写代码 这是我的木头猫
问题内容: 我有一个Django站点,当用户请求时会发生刮擦,并且我的代码在新过程中启动了Scrapy Spider独立脚本。自然,这与增加用户数量无关。 像这样: 我决定使用Celery并使用工作人员将爬网请求排队。 但是,我遇到了无法重新启动龙卷风反应堆的问题。第一个蜘蛛和第二个蜘蛛成功运行,但随后的蜘蛛将引发ReactorNotRestartable错误。 任何人都可以在Celery框架中运
本文向大家介绍Swift使用参数自定义初始化,包括了Swift使用参数自定义初始化的使用技巧和注意事项,需要的朋友参考一下 示例 请注意,您不能省略参数标签: 为了允许省略参数标签,请使用下划线_作为标签: 如果参数标签使用一个或多个属性共享名称,请使用self显式设置属性值: