Python中基于celery.schedulescrontab()的定时任务调度器实例
Celery是一个基于分布式消息传递的任务队列,允许您将任务分发给工作进程并跟踪其执行。Celery还提供了一个调度的功能模块,可以通过定时触发任务来实现定时任务。
在Celery中,可以使用celery.schedules.crontab()来定义定时任务的调度时间。celery.schedules.crontab()函数接受类似于Unix crontab的时间表达式,并返回一个Crontab对象。
下面是一个使用基于celery.schedules.crontab()的定时任务调度器的示例:
from celery import Celery
from celery.schedules import crontab
app = Celery('scheduler', broker='redis://localhost:6379/0')
@app.task
def my_task():
print('Hello, celery!')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 定义每分钟调度任务
sender.add_periodic_task(crontab(minute='*'), my_task.s(), name='every_minute_task')
# 定义每天调度任务
sender.add_periodic_task(crontab(hour=0, minute=0), my_task.s(), name='every_day_task')
# 定义每周调度任务
sender.add_periodic_task(crontab(hour=0, minute=0, day_of_week='mon'), my_task.s(), name='every_week_task')
# 定义每月调度任务
sender.add_periodic_task(crontab(hour=0, minute=0, day_of_month='1'), my_task.s(), name='every_month_task')
# 启动定时任务调度器
app.conf.beat_schedule = {
'every_10_seconds': {
'task': 'my_task',
'schedule': 10.0,
},
}
if __name__ == '__main__':
app.start()
在上面的例子中,首先我们创建了一个Celery实例,并定义了一个my_task函数作为我们要定时执行的任务。
然后,我们使用app.on_after_configure.connect装饰器来注册一个回调函数setup_periodic_tasks,该回调函数将在Celery配置完成后进行调用。在setup_periodic_tasks函数中,我们通过sender.add_periodic_task()方法定义了不同时间间隔的定时任务。比如,我们定义了every_minute_task每分钟执行一次,every_day_task每天执行一次,every_week_task每周一执行一次,every_month_task每月一号执行一次。
最后,我们使用app.conf.beat_schedule配置项来定义一个定时的调度任务,该任务将每隔10秒执行一次。
在运行这个例子之前,需要确保已经安装了Celery和Redis。然后,可以在命令行中输入以下命令来启动这个定时任务调度器:
celery -A <文件名>.app worker --loglevel=info
这样,就启动了Celery的worker和beat两个服务,定时任务调度器会按照预定时间触发任务,并将任务分发给worker进行执行。
总结:
使用Celery的定时任务调度器非常简单,只需定义任务和调度时间,并启动Celery的worker和beat服务即可。以上是一个基于celery.schedules.crontab()的定时任务调度器的示例,你可以根据自己的需求修改调度时间和任务处理逻辑。
