Python中任务调度模块APScheduler

APScheduler(Advanced Python Scheduler)是一个功能强大的Python定时任务框架,基于Quartz设计理念,提供了灵活的任务调度功能,支持基于日期、固定时间间隔以及crontab类型的任务调度,并且可以持久化任务。

1. 安装APScheduler

1
2
3
4
5
# 安装最新版本
pip install apscheduler

# 安装指定版本
pip install apscheduler==3.10.4

2. 基本架构

APScheduler由四个核心组件组成:

  1. 触发器(Triggers):定义任务触发的条件

    • 描述任务何时被触发,支持按日期、时间间隔或cron表达式三种方式
  2. 任务存储器(Job Stores):存放任务

    • 可以存储在内存(默认)或数据库中
    • 注意:调度器之间不能共享任务存储器
  3. 执行器(Executors):用于执行任务

    • 将任务提交到线程池或进程池中运行
    • 任务完成后通知调度器触发相应的事件
  4. 调度器(Schedulers):协调三个组件的运行

    • 根据配置将触发器、任务存储器和执行器组合在一起

3. 调度器类型

3.1 BlockingScheduler

阻塞式调度器,适用于作为独立进程运行的情况:

1
2
3
4
5
6
7
8
9
10
from apscheduler.schedulers.blocking import BlockingScheduler
import time

scheduler = BlockingScheduler()

def job():
print(f"{time.asctime()}: 执行任务")

scheduler.add_job(job, 'interval', seconds=3)
scheduler.start() # 此调用会阻塞当前线程

3.2 BackgroundScheduler

后台调度器,适用于在应用程序后台运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()

def job():
print(f"{time.asctime()}: 执行任务")

scheduler.add_job(job, 'interval', seconds=3)
scheduler.start()

# 主线程继续执行其他任务
time.sleep(10)
print("主线程任务完成")

3.3 其他调度器

  • AsyncIOScheduler:适用于使用asyncio的异步应用
  • GeventScheduler:适用于使用Gevent的应用
  • TornadoScheduler:适用于Tornado应用
  • TwistedScheduler:适用于Twisted应用
  • QtScheduler:适用于Qt应用

4. 触发器类型

4.1 date触发器

在指定的日期和时间执行一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

scheduler = BackgroundScheduler()

def job():
print(f"任务执行时间: {datetime.now()}")

# 使用datetime对象
scheduler.add_job(job, 'date', run_date=datetime(2024, 12, 31, 23, 59, 59))

# 使用字符串
scheduler.add_job(job, 'date', run_date="2024-12-31 23:59:59")

# 带时区
scheduler.add_job(job, 'date', run_date="2024-12-31 23:59:59", timezone="Asia/Shanghai")

scheduler.start()

4.2 interval触发器

按照固定的时间间隔重复执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

def job():
print("执行周期性任务")

# 每3秒执行一次
scheduler.add_job(job, 'interval', seconds=3)

# 每2小时执行一次,设置开始和结束时间
scheduler.add_job(
job,
'interval',
hours=2,
start_date='2024-01-01 00:00:00',
end_date='2024-12-31 23:59:59'
)

# 复杂间隔:每1天2小时30分钟执行一次
scheduler.add_job(
job,
'interval',
days=1,
hours=2,
minutes=30
)

scheduler.start()

4.3 cron触发器

按照cron表达式执行任务,功能最强大:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

def job():
print("执行cron任务")

# 每天早上8点执行
scheduler.add_job(job, 'cron', hour=8)

# 每天8:30执行
scheduler.add_job(job, 'cron', hour=8, minute=30)

# 每个月1号的8:30执行
scheduler.add_job(job, 'cron', day=1, hour=8, minute=30)

# 每周一的8:30执行
scheduler.add_job(job, 'cron', day_of_week=0, hour=8, minute=30)

# 使用cron表达式格式(与Linux crontab类似)
scheduler.add_job(job, 'cron', expression='30 8 * * 1') # 每周一8:30

scheduler.start()

4.4 cron表达式详解

表达式 描述 示例
* 通配符,匹配所有值 minutes=* 每分钟触发
*/a 可被a整除的通配符 minutes=*/5 每5分钟触发
a-b 范围a-b触发 hours=9-17 9点到17点之间每小时触发
a-b/c 范围a-b,且可被c整除时触发 minutes=0-30/10 0-30分钟内每10分钟触发
xth y 第几个星期几触发 day='1st mon' 每个月第一个周一
last x 一个月中最后一个星期几触发 day='last fri' 每个月最后一个周五
last 一个月最后一天触发 day='last' 每个月最后一天
x,y,z 组合表达式 hours='9,12,18' 9点、12点、18点触发

5. 任务存储配置

5.1 内存存储(默认)

1
2
3
4
from apscheduler.schedulers.background import BackgroundScheduler

# 默认使用内存存储
scheduler = BackgroundScheduler()

5.2 数据库存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# 使用SQLite数据库
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

scheduler = BackgroundScheduler(jobstores=jobstores)

# 使用MySQL数据库
# jobstores = {
# 'default': SQLAlchemyJobStore(url='mysql://user:password@localhost/mydb')
# }

6. 执行器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 配置线程池和进程池执行器
executors = {
'default': ThreadPoolExecutor(20), # 20个线程
'processpool': ProcessPoolExecutor(5) # 5个进程
}

scheduler = BackgroundScheduler(executors=executors)

# 为任务指定执行器
def cpu_intensive_job():
# 耗时的CPU密集型任务
pass

def io_intensive_job():
# IO密集型任务
pass

# 使用进程池执行CPU密集型任务
scheduler.add_job(cpu_intensive_job, 'interval', seconds=60, executor='processpool')

# 使用线程池执行IO密集型任务
scheduler.add_job(io_intensive_job, 'interval', seconds=10, executor='default')

7. 任务管理

7.1 添加任务

1
2
3
4
5
6
7
8
9
10
11
12
# 方法1:add_job()
job = scheduler.add_job(job_func, 'interval', seconds=5, id='my_job')

# 方法2:装饰器
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', seconds=5, id='decorated_job')
def job():
print("使用装饰器添加的任务")

7.2 修改任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 修改任务的触发时间
job.modify(seconds=10)

# 暂停任务
job.pause()

# 恢复任务
job.resume()

# 移除任务
job.remove()

# 或通过ID移除
# scheduler.remove_job('my_job')

7.3 获取任务列表

1
2
3
4
5
6
7
# 获取所有任务
jobs = scheduler.get_jobs()
for job in jobs:
print(f"任务ID: {job.id}, 下次执行时间: {job.next_run_time}")

# 通过ID获取任务
job = scheduler.get_job('my_job')

8. 错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_ERROR

scheduler = BackgroundScheduler()

def job_error_listener(event):
if event.exception:
print(f"任务执行出错: {event.exception}")
print(f"任务ID: {event.job_id}")

# 添加错误监听器
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)

def error_prone_job():
# 会引发异常的任务
raise Exception("任务执行失败")

scheduler.add_job(error_prone_job, 'interval', seconds=5)
scheduler.start()

9. 最佳实践

9.1 配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED

# 配置任务存储
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 配置执行器
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}

# 配置调度器
job_defaults = {
'coalesce': False, # 任务堆积时是否只执行一次
'max_instances': 3 # 同一任务最多同时运行的实例数
}

scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)

# 错误处理
def job_listener(event):
if event.exception:
print(f"任务 {event.job_id} 执行失败: {event.exception}")
else:
print(f"任务 {event.job_id} 执行成功")

scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# 启动调度器
scheduler.start()

9.2 注意事项

  1. 避免任务执行时间过长:如果任务执行时间超过触发间隔,可能会导致任务堆积
  2. 合理设置max_instances:根据任务性质设置合适的并发实例数
  3. 使用持久化存储:对于重要任务,使用数据库存储以防止进程重启后任务丢失
  4. 错误处理:添加错误监听器,及时发现和处理任务执行失败的情况
  5. 资源管理:根据任务类型选择合适的执行器(线程池或进程池)
  6. 调度器关闭:在应用程序退出前,调用scheduler.shutdown()关闭调度器

10. 实际应用示例

10.1 定时数据备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os
import shutil
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

def backup_data():
"""备份数据到指定目录"""
source_dir = '/path/to/data'
backup_dir = f'/path/to/backup/{datetime.now().strftime("%Y%m%d_%H%M%S")}'

try:
os.makedirs(backup_dir, exist_ok=True)
shutil.copytree(source_dir, backup_dir)
print(f"数据备份完成: {backup_dir}")
except Exception as e:
print(f"备份失败: {e}")

# 创建调度器
scheduler = BackgroundScheduler()

# 每天凌晨2点执行备份
scheduler.add_job(
backup_data,
'cron',
hour=2,
minute=0,
id='data_backup'
)

scheduler.start()
print("备份调度已启动")

10.2 定时API调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import requests
from apscheduler.schedulers.background import BackgroundScheduler

def fetch_data():
"""定时从API获取数据"""
try:
response = requests.get('https://api.example.com/data')
if response.status_code == 200:
data = response.json()
print(f"获取数据成功: {data}")
# 处理数据...
else:
print(f"API调用失败: {response.status_code}")
except Exception as e:
print(f"请求异常: {e}")

# 创建调度器
scheduler = BackgroundScheduler()

# 每15分钟调用一次API
scheduler.add_job(
fetch_data,
'interval',
minutes=15,
id='api_fetch'
)

scheduler.start()
print("API调用调度已启动")

11. 参考资料