创建任务工厂的另一种方式
之前在创建任务工厂的时候 , 是将函数导入到 app.py 中 , 然后通过 add = app.task(add) 的方式手动装饰 , 因为有哪些任务工厂必须要让 worker 知道 , 所以一定要在 app.py 里面出现 。但是这显然不够优雅 , 那么可不可以这么做呢?
# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路径位于 sys.path 中
# 因此这里可以直接 from app import app
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
# celery_demo/app.py
from tasks.task1 import add, sub
按照上面这种做法 , 理想上可以 , 但现实不行 , 因为会发生循环导入 。
所以 celery 提供了一个办法 , 我们依旧在 task1.py 中 import app , 但在 app.py 中不再使用 import , 而是通过 include 加载的方式 , 我们看一下:
# celery_demo/tasks/task1.py
from app import app
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
# celery_demo/app.py
from celery import Celery
import config
# 通过 include 指定存放任务的 py 文件
# 注意它和 worker 启动路径之间的关系
# 我们是在 celery_demo 目录下启动的 worker
# 所以应该写成 "tasks.task1"
# 如果是在 celery_demo 的上一级目录启动 worker
# 那么这里就要指定为 "celery_demo.tasks.task1"
# 当然启动时的 -A app 也要换成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果还有其它文件 , 比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加进去
app.config_from_object(config)
在 celery_demo 目录下重新启动 worker 。
文章插图
为了方便 , 我们只保留了两个任务工厂 。可以看到此时就成功启动了 , 并且也更加方便和优雅一些 。之前是在 task1.py 中定义函数 , 然后再把 task1.py 中的函数导入到 app.py 里面 , 然后手动进行装饰 。虽然这么做是没问题的 , 但很明显这种做法不适合管理 。
所以还是要将 app.py 中的 app 导入到 task1.py 中直接创建任务工厂 , 但如果再将 task1.py 中的任务工厂导入到 app.py 中就会发生循环导入 。于是 celery 提供了一个 include 参数 , 可以在创建 app 的时候自动将里面所有的任务工厂加载进来 , 然后启动并告诉 worker 。
我们来测试一下:
# 通过 tasks.task1 导入任务工厂
# 然后创建任务 , 发送至队列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11查看一下 worker 的输出:
文章插图
结果一切正常 。
Task 对象
我们之前通过对一个函数使用 @app.task 即可将其变成一个任务工厂 , 而这个任务工厂就是一个 Task 实例对象 。而我们在使用 @app.task 的时候 , 其实是可以加上很多的参数的 , 常用参数如下:
-
name:默认的任务名是一个uuid , 我们可以通过 name 参数指定任务名 , 当然这个 name 就是 apply_async 的参数 name 。如果在 apply_async 中指定了 , 那么以 apply_async 指定的为准;
-
bind:一个 bool 值 , 表示是否和任务工厂进行绑定 。如果绑定 , 任务工厂会作为参数传递到方法中;
-
base:定义任务的基类 , 用于定义回调函数 , 当任务到达某个状态时触发不同的回调函数 , 默认是 Task , 所以我们一般会自己写一个类然后继承 Task;
-
default_retry_delay:设置该任务重试的延迟机制 , 当任务执行失败后 , 会自动重试 , 单位是秒 , 默认是3分钟;
推荐阅读
- 龙之家族|《龙之家族》这1细节,暗示了丹妮莉丝,才是最伟大的坦格利安?
- 小米|37岁财富自由!雷军接受央视采访:我要成伟大的人 小米要影响世界、造车是被逼的
- 中国最大的岛屿是日本岛 中国最大的岛屿是什么岛-张雪峰
- 盛明兰|重温《知否》,王老太师一生最大的败笔,就是娶了偏心的王老太太
- 韩剧|《当你沉睡时》:当噩梦照进现实,救赎和信任才是最强的陪伴
- 赵鹏翔|“李易峰”事件最大的受益者是谁?
- 罚罪|《罚罪》:大王现身,是全剧最大的反转吗,网友:不是的
- |能力,让一个人迅速脱颖而出,如何发挥出你的最强技能?
- 华为|喷问界M7不好看的人品位比较Low!余承东:华为做伟大的设计
- 最大的恐龙有多大 最大的恐龙排行榜前十名
