Python中APScheduler库的奥秘( 二 )

在上述代码中,我们使用默认的内存存储来存储任务 。如果需要将任务存储在数据库中,可以使用jobstores参数来设置 。
from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoreimport time# 创建后台调度器scheduler = BackgroundScheduler()# 创建数据库存储jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}# 定义任务函数def job():print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 添加定时任务 , 每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")在上述代码中,我们使用了SQLAlchemyJobStore来将任务存储在SQLite数据库中 。
5. 并发执行默认情况下,APScheduler会将任务串行执行,也就是说一个任务结束后才会执行下一个任务 。如果希望并发执行多个任务 , 可以使用max_instances参数来设置 。
from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job(index):print(f"定时任务{index}执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 添加定时任务,每隔5秒执行一次,最多并发3个任务scheduler.add_job(job, 'interval', seconds=5, args=[1], max_instances=3)scheduler.add_job(job, 'interval', seconds=5, args=[2], max_instances=3)scheduler.add_job(job, 'interval', seconds=5, args=[3], max_instances=3)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")在上述代码中,我们使用了args参数传递参数给任务函数,并使用max_instances参数设置最多并发3个任务 。
6. 阻塞和非阻塞APScheduler提供了阻塞和非阻塞两种调度器类型 。阻塞调度器: 在调度器启动后,会阻塞主线程直到所有任务完成 。
from apscheduler.schedulers.blocking import BlockingSchedulerimport time# 创建阻塞调度器scheduler = BlockingScheduler()# 定义任务函数def job():print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()print("主线程结束")非阻塞调度器: 在调度器启动后 , 不会阻塞主线程 。
from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")在上述代码中,我们分别使用BlockingScheduler和BackgroundScheduler创建了阻塞和非阻塞调度器 。
7. 错误处理在任务执行过程中 , 可能会出现异常 。APScheduler提供了异常处理机制,我们可以通过try...except...捕获任务函数中的异常,并进行相应的处理 。
from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():try:print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 抛出一个异常rAIse ValueError("任务出现异常")except Exception as e:print("任务执行过程中发生异常:", str(e))# 添加定时任务 , 每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")在上述代码中,我们在任务函数中抛出了一个ValueError异常,并通过try...except...捕获并输出了异常信息 。
8. 立即执行任务有时候我们可能需要立即执行一个任务,而不是等到下次触发时间 。APScheduler提供了run_job方法来立即执行任务 。
from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))# 添加定时任务 , 每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 立即执行任务scheduler.run_job(job)# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")


推荐阅读