欢迎访问宙启技术站
智能推送

Python中使用celery.schedulescrontab()设置定时的计划

发布时间:2023-12-23 23:00:00

在Python中,使用Celery库可以轻松地设置定时任务。Celery提供了一个方便的方法schedules.crontab()来定义任务的执行计划。schedules.crontab()返回一个crontab对象,该对象可以传递给任务的@periodic_task装饰器。

下面是一个例子,演示如何使用Celery的crontab来设置一个每天晚上10点运行的任务:

首先,你需要安装Celery库。你可以使用以下命令安装:

pip install celery

接下来,创建一个名为tasks.py的Python文件,并将下面的代码添加到文件中:

from celery import Celery
from celery.schedules import crontab

# 创建Celery应用
app = Celery('tasks', broker='amqp://guest@localhost//', backend='rpc://')

# 定义任务
@app.task
def my_task():
    print("Running task")

# 设置任务的执行计划
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每天晚上10点执行任务
    sender.add_periodic_task(crontab(hour=22, minute=0), my_task.s(), name='run_task')

if __name__ == '__main__':
    app.start()

在上面的代码中:

- Celery类用于创建一个Celery应用。broker参数指定了消息代理的地址,backend参数指定了任务结果的存储方式。

- my_task函数是我们要定时执行的任务。你可以根据自己的需求进行修改。

- setup_periodic_tasks函数使用@app.on_after_configure.connect装饰器来定义在应用启动后的回调函数。在该函数中,我们使用add_periodic_task方法来设置任务的执行计划。在此例中,我们设置任务每天晚上10点执行一次。

- 最后一行代码app.start()用于启动Celery应用。

接下来,打开一个终端窗口,并切换到与tasks.py文件相同的目录。在终端中运行以下命令以启动Celery worker:

celery -A tasks worker --loglevel=info

现在,你的任务将会每天晚上10点被执行。

注意:在上述例子中,我们使用了RabbitMQ作为Celery的消息代理(broker)和结果存储后端(backend),你可以根据自己的需求选择合适的代理和后端。

综上所述,通过使用Celery的schedules.crontab()方法,你可以方便地设置定时任务的执行计划。