Python 实现定时任务的八种方案( 二 )

  • run():运行所有预定的事件 。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件 。
  • 个人点评:比threading.Timer更好,不需要循环调用 。
    利用调度模块schedule实现定时任务schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间 。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数) 。
    先来看代码,是不是不看文档就能明白什么意思?
    import scheduleimport timedef job():print("I'm working...")schedule.every(10).seconds.do(job)schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every(5).to(10).minutes.do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)schedule.every().minute.at(":17").do(job)while True:schedule.run_pending()time.sleep(1)装饰器:通过 @repeat() 装饰静态方法
    import timefrom schedule import every, repeat, run_pending@repeat(every().second)def job():print('working...')while True:run_pending()time.sleep(1)传递参数:
    import scheduledef greet(name):print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')schedule.every(4).seconds.do(greet, name='Bob')while True:schedule.run_pending()装饰器同样能传递参数:
    from schedule import every, repeat, run_pending@repeat(every().second, 'World')@repeat(every().minute, 'Mars')def hello(planet):print('Hello', planet)while True:run_pending()取消任务:
    import schedulei = 0def some_task():global ii += 1print(i)if i == 10:schedule.cancel_job(job)print('cancel job')exit(0)job = schedule.every().second.do(some_task)while True:schedule.run_pending()运行一次任务:
    import timeimport scheduledef job_that_executes_once():print('Hello')return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)while True:schedule.run_pending()time.sleep(1)根据标签检索任务:
    # 检索所有任务:schedule.get_jobs()import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')print(friends)根据标签取消任务:
    # 取消所有任务:schedule.clear()import scheduledef greet(name):print('Hello {}'.format(name))if name == 'Cancel':schedule.clear('second-tasks')print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True:schedule.run_pending()运行任务到某时间:
    import schedulefrom datetime import datetime, timedelta, timedef job():print('working...')schedule.every().second.until('23:59').do(job)# 今天23:59停止schedule.every().second.until('2030-01-01 18:30').do(job)# 2030-01-01 18:30停止schedule.every().second.until(timedelta(hours=8)).do(job)# 8小时后停止schedule.every().second.until(time(23, 59, 59)).do(job)# 今天23:59:59停止schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)# 2030-01-01 18:30停止while True:schedule.run_pending()马上运行所有任务(主要用于测试):
    import scheduledef job():print('working...')def job1():print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3)# 任务间延迟3秒并行运行:使用 Python 内置队列实现:
    import threadingimport timeimport scheduledef job1():print("I'm running on thread %s" % threading.current_thread())def job2():print("I'm running on thread %s" % threading.current_thread())def job3():print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)schedule.every(10).seconds.do(run_threaded, job2)schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)


    推荐阅读