Python中常用的任务调度库推荐
发布时间:2023-12-18 11:22:01
在Python中,有许多任务调度库可以用来实现各种定时任务和周期任务的调度。以下是一些常用的任务调度库以及使用例子:
1. schedule
schedule是一个用于创建周期性任务的库。可以使用schedule库来执行基于时间间隔的任务调度。
import schedule
import time
def job():
print("I'm running...")
# 每分钟执行一次任务
schedule.every().minute.do(job)
while True:
schedule.run_pending()
time.sleep(1)
2. apscheduler
apscheduler是一个功能强大的调度库,支持一次性立即执行的任务、以及基于时间间隔的周期性任务、cron表达式等多种调度方式。
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("I'm running...")
# 创建调度器
scheduler = BlockingScheduler()
# 添加一个每三秒执行一次的任务
scheduler.add_job(job, 'interval', seconds=3)
# 启动调度器
scheduler.start()
3. Celery
Celery是一个强大的分布式任务调度框架,可以用于执行异步任务、定时任务等复杂的任务调度。通常与消息队列如RabbitMQ或Redis一起使用。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
# 定时执行任务
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
# 启动Celery Worker和Beat进程
# celery -A tasks worker --loglevel=info
# celery -A tasks beat --loglevel=info
4. APScheduler
APScheduler是一个Python实现的任务调度库,支持多种调度方式,如固定时间间隔、固定时间点、基于cron表达式等。
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print("I'm running...")
# 创建调度器
scheduler = BackgroundScheduler()
# 添加一个每五秒执行一次的任务
scheduler.add_job(job, 'interval', seconds=5)
# 启动调度器
scheduler.start()
5. schedule_plus
schedule_plus是一个基于schedule库的任务调度库,提供了更多的功能和调度方式,如固定时间点调度、只运行一次的任务等。
from schedule_plus import every, delay, do, once
@every(10).seconds
def job():
print("I'm running...")
@delay(10).seconds
def delayed_job():
print("I'll be executed 10 seconds later.")
@do(hour=12, minute=0)
def scheduled_job():
print("I'm a scheduled job.")
@once(year=2023, month=5, day=1)
def run_only_once():
print("I'll be executed only once.")
job.start()
delayed_job.start()
scheduled_job.start()
run_only_once.start()
以上是一些常用的Python任务调度库及其使用例子。根据不同的需求,选择适合的任务调度库可以更轻松地管理和调度各种任务。无论是定时执行任务还是周期性任务,这些任务调度库都可以帮助我们简化代码和提高开发效率。
