_single()在Python中实现全局 的任务调度器
发布时间:2023-12-16 20:24:05
在Python中实现全局 的任务调度器可以使用单例模式来实现。单例模式确保一个类只有一个实例,并提供一个全局访问点。
下面是一个实现全局 的任务调度器的例子:
import threading
import time
class TaskScheduler:
_singleton_lock = threading.Lock()
_singleton_instance = None
def __init__(self):
self.tasks = {}
@classmethod
def instance(cls):
if not cls._singleton_instance:
with cls._singleton_lock:
if not cls._singleton_instance:
cls._singleton_instance = TaskScheduler()
return cls._singleton_instance
def add_task(self, task_name, task_function, interval):
if task_name in self.tasks:
raise ValueError("Task with name '{}' already exists".format(task_name))
self.tasks[task_name] = {
'task_function': task_function,
'interval': interval,
'next_run': time.time() + interval
}
def start(self):
while True:
current_time = time.time()
for task_name, task in self.tasks.items():
if current_time >= task['next_run']:
task['task_function']()
task['next_run'] = current_time + task['interval']
time.sleep(1)
# 使用例子
def task1():
print("Running task 1")
def task2():
print("Running task 2")
def task3():
print("Running task 3")
scheduler = TaskScheduler.instance()
scheduler.add_task("task1", task1, 5)
scheduler.add_task("task2", task2, 10)
scheduler.add_task("task3", task3, 15)
scheduler.start()
在上面的例子中,TaskScheduler 类使用单例模式实现。它包含一个私有的类变量 _singleton_instance 来保存单例实例。instance() 方法使用线程锁确保只有在该变量为空时才创建单例实例。
add_task() 方法用于向任务调度器中添加任务,包括任务的名称、任务函数和运行间隔。start() 方法是一个无限循环,每隔1秒检查各个任务的下次运行时间,并执行任务函数。
在使用例子中,我们定义了三个任务函数 task1(), task2(), task3(),并分别将它们添加到任务调度器中。task1() 每隔5秒执行一次,task2() 每隔10秒执行一次,task3() 每隔15秒执行一次。最后,调用 scheduler.start() 方法启动任务调度器。
通过这种方式,我们可以实现一个全局 的任务调度器,可以在不同的地方添加任务,并确保每个任务按照其指定的时间间隔执行。
