1.1 定时任务APScheduler

1.简介

APScheduler是基于Quartz的一个Python定时任务框架。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。

官方文档

2.安装

$ pip install apscheduler

3.基本概念

APScheduler有四大组件:

触发器 triggers 触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。除了初始配置之外,触发器是完全无状态的。

有三种内建的trigger:

  • date: 特定的时间点触发

  • interval: 固定时间间隔触发

  • cron: 在特定时间周期性地触发

任务储存器 job stores

用于存放任务,把任务存放在内存(默认MemoryJobStore)或数据库中。

执行器 executors

执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

调度器 schedulers

把上方三个组件作为参数,通过创建调度器实例来运行

根据开发需求选择相应的组件,下面是不同的调度器组件:

  • BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。

  • BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。

  • AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。

  • GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。

  • TornadoScheduler Tornado调度器,适用于构建Tornado应用。

  • TwistedScheduler Twisted调度器,适用于构建Twisted应用。

  • QtScheduler Qt调度器,适用于构建Qt应用。

4.使用步骤

新建一个调度器schedulers

添加调度任务

运行调度任务

5.示例

5.1 触发器date

特定的时间点触发,只执行一次。参数如下:

参数

说明

run_date (datetime 或 str)

作业的运行日期或时间

timezone (datetime.tzinfo 或 str)

指定时区

使用例子:

from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
    print(text)
scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])
scheduler.start()

5.2 触发器interval

固定时间间隔触发。参数如下:

参数

说明

weeks (int)

间隔几周

days (int)

间隔几天

hours (int)

间隔几小时

minutes (int)

间隔几分钟

seconds (int)

间隔多少秒

start_date (datetime 或 str)

开始日期

end_date (datetime 或 str)

结束日期

timezone (datetime.tzinfo 或str)

指定时区

使用例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])
scheduler.start()

5.3 触发器cron

在特定时间周期性地触发。参数如下:

参数

说明

year (int 或 str)

年,4位数字

month (int 或 str)

月 (范围1-12)

day (int 或 str)

日 (范围1-31)

week (int 或 str)

周 (范围1-53)

day_of_week (int 或 str)

周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)

hour (int 或 str)

时 (范围0-23)

minute (int 或 str)

分 (范围0-59)

second (int 或 str)

秒 (范围0-59)

start_date (datetime 或 str)

最早开始日期(包含)

end_date (datetime 或 str)

最晚结束时间(包含)

timezone (datetime.tzinfo 或str)

指定时区

这些参数支持算数表达式,取值格式有如下:

Express ion

**Fi eld* *

Description

*

any

Fire on every value

*/a

any

Fire every a values, starting from the minimum

a-b

any

Fire on any value within the a-b range (a must be smaller than b)

a-b/c

any

Fire every c values within the a-b range

xth y

day

Fire on the x -th occurrence of weekday y within the month

last x

day

Fire on the last occurrence of weekday x within the month

last

day

Fire on the last day within the month

x,y,z

any

Fire on any matching expression; can combine any number of any of the above expressio

使用例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
scheduler.start()

5.4 通过装饰器scheduled_job()添加方法

添加任务的方法有两种:

(1)通过调用add_job()

(2)通过装饰器scheduled_job():

第一种方法是最常用的方法。第二种方法主要是方便地声明在应用程序运行时不会更改的任务。该 add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务。

使用例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=5)
def job1():
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job1 --- {}'.format(t))
@scheduler.scheduled_job('cron', second='*/7')
def job2():
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job2 --- {}'.format(t))
scheduler.start()

6.配置调度程序

生成一个名为“default”的RedisJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最大线程数为10

default_redis_jobstore = RedisJobStore(
        db=2,
        jobs_key="apschedulers.default_jobs",
        run_times_key="apschedulers.default_run_times",
        host="",
        port=6379,
        password="xxx"
    )

    executor = ThreadPoolExecutor(10)
    init_scheduler_options = {
        "jobstores": {
            "default": default_redis_jobstore
        },
        "executors": {
            "default": executor
        },
        "job_defaults": {
            'misfire_grace_time': 20 * 60,  # 最大错过时间
            'coalesce': False,  # 是否合并执行
            'max_instances': 20  # 最大实例数
        }
    }
    sched = BackgroundScheduler(**init_scheduler_options)
    sched.start()

7.控制调度程序

7.1 启动调度程序

调用start()方法

scheduler.start()

7.2 新增工作

  • 调用方法add_job()

  • 通过装饰器scheduled_job()

7.3 删除工作**

  • 通过调用remove_job(),使用作业ID或别名删除工作

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
  • 通过调用Job实例的remove()方法

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

7.4 暂停和恢复工作

scheduler.pause() # 暂停
scheduler.resume() # 恢复

7.5 修改工作

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

7.6 获取计划的作业列表

scheduler.get_jobs()    # 作业列表
scheduler.get_job('job_id') # 指定id的作业

7.7 关闭调度程序

scheduler.shutdown()

7.8 首次执行时间问题

scheduler.add_job(job1, 'interval', seconds=5, max_instances=1, next_run_time=datetime.datetime.now())