利用调度模块schedule实现定时任务schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间 。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数) 。
先来看代码,是不是不看文档就能明白什么意思?
import scheduleimport timedef job():print("I'm working...")schedule.every(10).seconds.do(job)schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every(5).to(10).minutes.do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)schedule.every().minute.at(":17").do(job)while True:schedule.run_pending()time.sleep(1)装饰器:通过 @repeat() 装饰静态方法import timefrom schedule import every, repeat, run_pending@repeat(every().second)def job():print('working...')while True:run_pending()time.sleep(1)传递参数:import scheduledef greet(name):print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')schedule.every(4).seconds.do(greet, name='Bob')while True:schedule.run_pending()装饰器同样能传递参数:from schedule import every, repeat, run_pending@repeat(every().second, 'World')@repeat(every().minute, 'Mars')def hello(planet):print('Hello', planet)while True:run_pending()取消任务:import schedulei = 0def some_task():global ii += 1print(i)if i == 10:schedule.cancel_job(job)print('cancel job')exit(0)job = schedule.every().second.do(some_task)while True:schedule.run_pending()运行一次任务:import timeimport scheduledef job_that_executes_once():print('Hello')return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)while True:schedule.run_pending()time.sleep(1)根据标签检索任务:# 检索所有任务:schedule.get_jobs()import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')print(friends)根据标签取消任务:# 取消所有任务:schedule.clear()import scheduledef greet(name):print('Hello {}'.format(name))if name == 'Cancel':schedule.clear('second-tasks')print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True:schedule.run_pending()运行任务到某时间:import schedulefrom datetime import datetime, timedelta, timedef job():print('working...')schedule.every().second.until('23:59').do(job)# 今天23:59停止schedule.every().second.until('2030-01-01 18:30').do(job)# 2030-01-01 18:30停止schedule.every().second.until(timedelta(hours=8)).do(job)# 8小时后停止schedule.every().second.until(time(23, 59, 59)).do(job)# 今天23:59:59停止schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)# 2030-01-01 18:30停止while True:schedule.run_pending()马上运行所有任务(主要用于测试):import scheduledef job():print('working...')def job1():print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3)# 任务间延迟3秒并行运行:使用 Python 内置队列实现:import threadingimport timeimport scheduledef job1():print("I'm running on thread %s" % threading.current_thread())def job2():print("I'm running on thread %s" % threading.current_thread())def job3():print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)schedule.every(10).seconds.do(run_threaded, job2)schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 一个操作,实现公司和家里硬盘共享!抓紧get
- 微信扫码实现跳转
- nginx简易实现权限登录
- python中两个下划线是什么意思?python中一个下划线表示
- python 二分法求方程的根
- 为实现全球通信,至少需要发射多少颗卫星-全球卫星网络通信-
- Python正则表达式保姆式教学,带你精通大名鼎鼎的正则
- Python中的函数装饰器
- 某些音乐只能听一分钟?Python绕过反爬,完美下载音乐
- Flink的设计与实现:集群资源管理
