Python 最强大的任务调度框架 Celery!( 七 )


创建任务工厂的另一种方式
之前在创建任务工厂的时候 , 是将函数导入到 app.py 中 , 然后通过 add = app.task(add) 的方式手动装饰 , 因为有哪些任务工厂必须要让 worker 知道 , 所以一定要在 app.py 里面出现 。但是这显然不够优雅 , 那么可不可以这么做呢?
# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路径位于 sys.path 中
# 因此这里可以直接 from app import app
@app.task
def add(x, y):
return x + y

 
@app.task
def sub(x, y):
return x - y

# celery_demo/app.py
from tasks.task1 import add, sub

按照上面这种做法 , 理想上可以 , 但现实不行 , 因为会发生循环导入 。
所以 celery 提供了一个办法 , 我们依旧在 task1.py 中 import app , 但在 app.py 中不再使用 import , 而是通过 include 加载的方式 , 我们看一下:
# celery_demo/tasks/task1.py
from app import app

 
@app.task
def add(x, y):
return x + y

@app.task
def sub(x, y):
return x - y

# celery_demo/app.py
from celery import Celery
import config

# 通过 include 指定存放任务的 py 文件
# 注意它和 worker 启动路径之间的关系
# 我们是在 celery_demo 目录下启动的 worker
# 所以应该写成 "tasks.task1"
# 如果是在 celery_demo 的上一级目录启动 worker
# 那么这里就要指定为 "celery_demo.tasks.task1"
# 当然启动时的 -A app 也要换成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果还有其它文件 , 比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加进去
app.config_from_object(config)

在 celery_demo 目录下重新启动 worker 。

Python 最强大的任务调度框架 Celery!

文章插图
为了方便 , 我们只保留了两个任务工厂 。可以看到此时就成功启动了 , 并且也更加方便和优雅一些 。之前是在 task1.py 中定义函数 , 然后再把 task1.py 中的函数导入到 app.py 里面 , 然后手动进行装饰 。虽然这么做是没问题的 , 但很明显这种做法不适合管理 。
所以还是要将 app.py 中的 app 导入到 task1.py 中直接创建任务工厂 , 但如果再将 task1.py 中的任务工厂导入到 app.py 中就会发生循环导入 。于是 celery 提供了一个 include 参数 , 可以在创建 app 的时候自动将里面所有的任务工厂加载进来 , 然后启动并告诉 worker 。
我们来测试一下:
# 通过 tasks.task1 导入任务工厂
# 然后创建任务 , 发送至队列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11

查看一下 worker 的输出:
Python 最强大的任务调度框架 Celery!

文章插图
结果一切正常 。
Task 对象
我们之前通过对一个函数使用 @app.task 即可将其变成一个任务工厂 , 而这个任务工厂就是一个 Task 实例对象 。而我们在使用 @app.task 的时候 , 其实是可以加上很多的参数的 , 常用参数如下: