欢迎访问宙启技术站
智能推送

Python中基于celery.schedulescrontab()的定时任务调度器实例

发布时间:2023-12-23 10:33:40

Celery是一个基于分布式消息传递的任务队列,允许您将任务分发给工作进程并跟踪其执行。Celery还提供了一个调度的功能模块,可以通过定时触发任务来实现定时任务。

在Celery中,可以使用celery.schedules.crontab()来定义定时任务的调度时间。celery.schedules.crontab()函数接受类似于Unix crontab的时间表达式,并返回一个Crontab对象。

下面是一个使用基于celery.schedules.crontab()的定时任务调度器的示例:

from celery import Celery
from celery.schedules import crontab

app = Celery('scheduler', broker='redis://localhost:6379/0')

@app.task
def my_task():
    print('Hello, celery!')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 定义每分钟调度任务
    sender.add_periodic_task(crontab(minute='*'), my_task.s(), name='every_minute_task')

    # 定义每天调度任务
    sender.add_periodic_task(crontab(hour=0, minute=0), my_task.s(), name='every_day_task')

    # 定义每周调度任务
    sender.add_periodic_task(crontab(hour=0, minute=0, day_of_week='mon'), my_task.s(), name='every_week_task')

    # 定义每月调度任务
    sender.add_periodic_task(crontab(hour=0, minute=0, day_of_month='1'), my_task.s(), name='every_month_task')

# 启动定时任务调度器
app.conf.beat_schedule = {
    'every_10_seconds': {
        'task': 'my_task',
        'schedule': 10.0,
    },
}

if __name__ == '__main__':
    app.start()

在上面的例子中,首先我们创建了一个Celery实例,并定义了一个my_task函数作为我们要定时执行的任务。

然后,我们使用app.on_after_configure.connect装饰器来注册一个回调函数setup_periodic_tasks,该回调函数将在Celery配置完成后进行调用。在setup_periodic_tasks函数中,我们通过sender.add_periodic_task()方法定义了不同时间间隔的定时任务。比如,我们定义了every_minute_task每分钟执行一次,every_day_task每天执行一次,every_week_task每周一执行一次,every_month_task每月一号执行一次。

最后,我们使用app.conf.beat_schedule配置项来定义一个定时的调度任务,该任务将每隔10秒执行一次。

在运行这个例子之前,需要确保已经安装了Celery和Redis。然后,可以在命令行中输入以下命令来启动这个定时任务调度器:

celery -A <文件名>.app worker --loglevel=info

这样,就启动了Celery的worker和beat两个服务,定时任务调度器会按照预定时间触发任务,并将任务分发给worker进行执行。

总结:

使用Celery的定时任务调度器非常简单,只需定义任务和调度时间,并启动Celery的worker和beat服务即可。以上是一个基于celery.schedules.crontab()的定时任务调度器的示例,你可以根据自己的需求修改调度时间和任务处理逻辑。