一. 如何调用
def f1(arg1, arg2): print('f1', arg1, arg2) def f2(arg1): print('f2', arg1) def f3(): print('f3') def f4(): print('周期任务', int(time.time())) timer = TaskTimer() # 把任务加入任务队列 timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30执行 timer.join_task(f2, [3], timing=14) # 每天14:00执行 timer.join_task(f3, [], timing=15) # 每天15:00执行 timer.join_task(f4, [], interval=10) # 每10秒执行1次 # 开始执行(此时才会创建线程) timer.start()
f1~f4是我们需要定时执行的函数。
首先创建TaskTimer对象(TaskTimer的代码在下面)。调用join_task函数,把需要执行的函数加入到任务队列。最后调用start,任务就开始执行了。
join_task参数:
fun:需要执行的函数
arg:fun的参数,如果没有就传一个空列表
interval:如果有此参数,说明任务是周期任务,单位为秒(注意interval最少5秒)
timing:如果有此参数,说明任务是定时任务,单位为时
注意:interval和timing只能选填1个
二. 源码
import datetime import time from threading import Thread from time import sleep class TaskTimer: __instance = None def __new__(cls, *args, **kwargs): """ 单例模式 """ if not cls.__instance: cls.__instance = object.__new__(cls) return cls.__instance def __init__(self): if not hasattr(self, 'task_queue'): setattr(self, 'task_queue', []) if not hasattr(self, 'is_running'): setattr(self, 'is_running', False) def write_log(self, level, msg): cur_time = datetime.datetime.now() with open('./task.log', mode='a+', encoding='utf8') as file: s = "[" + str(cur_time) + "][" + level + "] " + msg print(s) file.write(s + "\n") def work(self): """ 处理任务队列 """ while True: for task in self.task_queue: if task['interval']: self.cycle_task(task) elif task['timing']: self.timing_task(task) sleep(5) def cycle_task(self, task): """ 周期任务 """ if task['next_sec'] <= int(time.time()): try: task['fun'](*task['arg']) self.write_log("正常", "周期任务:" + task['fun'].__name__ + " 已执行") except Exception as e: self.write_log("异常", "周期任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e)) finally: task['next_sec'] = int(time.time()) + task['interval'] def timing_task(self, task): """ 定时任务 """ # 今天已过秒数 today_sec = self.get_today_until_now() # 到了第二天,就重置任务状态 if task['today'] != self.get_today(): task['today'] = self.get_today() task['today_done'] = False # 第一次执行 if task['first_work']: if today_sec >= task['task_sec']: task['today_done'] = True task['first_work'] = False else: task['first_work'] = False # 今天还没有执行 if not task['today_done']: if today_sec >= task['task_sec']: # 到点了,开始执行任务 try: task['fun'](*task['arg']) self.write_log("正常", "定时任务:" + task['fun'].__name__ + " 已执行") except Exception as e: self.write_log("异常", "定时任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e)) finally: task['today_done'] = True if task['first_work']: task['first_work'] = False def get_today_until_now(self): """ 获取今天凌晨到现在的秒数 """ i = datetime.datetime.now() return i.hour * 3600 + i.minute * 60 + i.second def get_today(self): """ 获取今天的日期 """ i = datetime.datetime.now() return i.day def join_task(self, fun, arg, interval=None, timing=None): """ interval和timing只能存在1个 :param fun: 你要调用的任务 :param arg: fun的参数 :param interval: 周期任务,单位秒 :param timing: 定时任务,取值:[0,24) """ # 参数校验 if (interval != None and timing != None) or (interval == None and timing == None): raise Exception('interval和timing只能选填1个') if timing and not 0 <= timing < 24: raise Exception('timing的取值范围为[0,24)') if interval and interval < 5: raise Exception('interval最少为5') # 封装一个task task = { 'fun': fun, 'arg': arg, 'interval': interval, 'timing': timing, } # 封装周期或定时任务相应的参数 if timing: task['task_sec'] = timing * 3600 task['today_done'] = False task['first_work'] = True task['today'] = self.get_today() elif interval: task['next_sec'] = int(time.time()) + interval # 把task加入任务队列 self.task_queue.append(task) self.write_log("正常", "新增任务:" + fun.__name__) def start(self): """ 开始执行任务 返回线程标识符 """ if not self.is_running: thread = Thread(target=self.work) thread.start() self.is_running = True self.write_log("正常", "TaskTimer已开始运行!") return thread.ident self.write_log("警告", "TaskTimer已运行,请勿重复启动!")
以上这篇python异步实现定时任务和周期任务的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。