欢迎访问宙启技术站
智能推送

_single()在Python中实现全局 的任务调度器

发布时间:2023-12-16 20:24:05

在Python中实现全局 的任务调度器可以使用单例模式来实现。单例模式确保一个类只有一个实例,并提供一个全局访问点。

下面是一个实现全局 的任务调度器的例子:

import threading
import time

class TaskScheduler:
    _singleton_lock = threading.Lock()
    _singleton_instance = None

    def __init__(self):
        self.tasks = {}

    @classmethod
    def instance(cls):
        if not cls._singleton_instance:
            with cls._singleton_lock:
                if not cls._singleton_instance:
                    cls._singleton_instance = TaskScheduler()
        return cls._singleton_instance

    def add_task(self, task_name, task_function, interval):
        if task_name in self.tasks:
            raise ValueError("Task with name '{}' already exists".format(task_name))
        self.tasks[task_name] = {
            'task_function': task_function,
            'interval': interval,
            'next_run': time.time() + interval
        }

    def start(self):
        while True:
            current_time = time.time()
            for task_name, task in self.tasks.items():
                if current_time >= task['next_run']:
                    task['task_function']()
                    task['next_run'] = current_time + task['interval']
            time.sleep(1)

# 使用例子
def task1():
    print("Running task 1")

def task2():
    print("Running task 2")

def task3():
    print("Running task 3")

scheduler = TaskScheduler.instance()
scheduler.add_task("task1", task1, 5)
scheduler.add_task("task2", task2, 10)
scheduler.add_task("task3", task3, 15)
scheduler.start()

在上面的例子中,TaskScheduler 类使用单例模式实现。它包含一个私有的类变量 _singleton_instance 来保存单例实例。instance() 方法使用线程锁确保只有在该变量为空时才创建单例实例。

add_task() 方法用于向任务调度器中添加任务,包括任务的名称、任务函数和运行间隔。start() 方法是一个无限循环,每隔1秒检查各个任务的下次运行时间,并执行任务函数。

在使用例子中,我们定义了三个任务函数 task1(), task2(), task3(),并分别将它们添加到任务调度器中。task1() 每隔5秒执行一次,task2() 每隔10秒执行一次,task3() 每隔15秒执行一次。最后,调用 scheduler.start() 方法启动任务调度器。

通过这种方式,我们可以实现一个全局 的任务调度器,可以在不同的地方添加任务,并确保每个任务按照其指定的时间间隔执行。