Python定时任务:schedule+Windows任务计划,让脚本自己按时跑
定时任务是自动化的核心,可以实现定期执行数据处理、发送报告、备份文件等操作。本篇将介绍Python中实现定时任务的多种方式,以及如何编写健壮的自动化脚本。
1. 定时任务方案概览
| 方案 | 特点 | 适用场景 |
|---|---|---|
| time.sleep | 简单,阻塞式 | 简单的循环任务 |
| schedule | 轻量级,语法友好 | 单进程定时任务 |
| APScheduler | 功能强大,支持持久化 | 复杂的调度需求 |
| Windows任务计划 | 系统级,可靠 | 生产环境推荐 |
| Celery | 分布式,支持消息队列 | 大规模任务调度 |
2. time模块
import time
from datetime import datetime
# 简单的循环定时
def simple_scheduler():
"""简单的定时执行"""
while True:
print(f"[{datetime.now()}] 执行任务...")
do_task()
time.sleep(60) # 等待60秒
# 指定时间执行
def run_at_time(target_hour, target_minute, task_func):
"""在指定时间执行任务"""
while True:
now = datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0)
if now >= target:
# 如果已过目标时间,设为明天
target = target.replace(day=target.day + 1)
wait_seconds = (target - now).total_seconds()
print(f"等待 {wait_seconds:.0f} 秒后执行...")
time.sleep(wait_seconds)
task_func()
# 间隔执行
def interval_runner(interval_seconds, task_func, max_runs=None):
"""按间隔执行任务"""
run_count = 0
while max_runs is None or run_count < max_runs:
start_time = time.time()
try:
task_func()
except Exception as e:
print(f"任务执行错误:{e}")
run_count += 1
# 计算下次执行时间
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
time.sleep(sleep_time)
3. schedule库
pip install schedule
import schedule
import time
from datetime import datetime
def job():
print(f"[{datetime.now()}] 执行任务")
# 各种调度方式
schedule.every(10).seconds.do(job) # 每10秒
schedule.every(5).minutes.do(job) # 每5分钟
schedule.every().hour.do(job) # 每小时
schedule.every().day.at("09:00").do(job) # 每天9点
schedule.every().monday.do(job) # 每周一
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15
# 带参数的任务
def greet(name):
print(f"Hello, {name}!")
schedule.every().day.at("08:00").do(greet, name="张三")
# 只运行一次
def run_once():
print("只运行一次")
return schedule.CancelJob # 返回这个会取消任务
schedule.every().day.at("10:00").do(run_once)
# 标签管理
schedule.every().hour.do(job).tag('hourly', 'important')
schedule.every().day.do(job).tag('daily')
# 取消特定标签的任务
schedule.clear('hourly')
# 运行调度器
def run_scheduler():
print("调度器启动...")
while True:
schedule.run_pending()
time.sleep(1)
# run_scheduler()
# 在后台线程运行
import threading
def run_continuously(interval=1):
"""在后台线程运行调度器"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
# 启动后台调度
# stop_run = run_continuously()
# 停止:stop_run.set()
4. APScheduler
pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime
def my_job():
print(f"[{datetime.now()}] 任务执行")
def job_with_args(name, value):
print(f"任务:{name} = {value}")
# 阻塞式调度器
def blocking_scheduler_example():
scheduler = BlockingScheduler()
# 间隔触发
scheduler.add_job(my_job, 'interval', seconds=30)
scheduler.add_job(my_job, 'interval', minutes=5)
scheduler.add_job(my_job, 'interval', hours=1)
# Cron触发(类似Linux crontab)
scheduler.add_job(my_job, 'cron', hour=9, minute=0) # 每天9:00
scheduler.add_job(my_job, 'cron', day_of_week='mon-fri', hour=8) # 工作日8点
scheduler.add_job(my_job, 'cron', day=1, hour=0) # 每月1号0点
# 带参数
scheduler.add_job(job_with_args, 'interval', seconds=60,
args=['test'], kwargs={'value': 100})
# 指定时间执行一次
from datetime import datetime, timedelta
run_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(my_job, 'date', run_date=run_time)
try:
scheduler.start()
except KeyboardInterrupt:
scheduler.shutdown()
# 后台调度器
def background_scheduler_example():
scheduler = BackgroundScheduler()
scheduler.add_job(my_job, 'interval', seconds=10, id='my_job_id')
scheduler.start()
# 主程序继续执行
print("调度器在后台运行...")
# 动态管理任务
# scheduler.pause_job('my_job_id') # 暂停
# scheduler.resume_job('my_job_id') # 恢复
# scheduler.remove_job('my_job_id') # 删除
# scheduler.reschedule_job('my_job_id', trigger='interval', seconds=5) # 重新调度
return scheduler
# Cron表达式详解
# 秒 分 时 日 月 周 年(年可选)
# * 任意值
# */n 每n个单位
# a-b 范围
# a,b,c 列表
cron_examples = {
'每分钟': 'cron', {'minute': '*'},
'每小时': 'cron', {'hour': '*'},
'每天9点': 'cron', {'hour': 9},
'工作日9点': 'cron', {'day_of_week': 'mon-fri', 'hour': 9},
'每月1号': 'cron', {'day': 1},
'每周一9点': 'cron', {'day_of_week': 'mon', 'hour': 9},
}
# 持久化存储(SQLite)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def persistent_scheduler():
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(my_job, 'interval', minutes=1, id='persistent_job',
replace_existing=True)
scheduler.start()
return scheduler
5. Windows任务计划程序
"""
使用Windows任务计划程序运行Python脚本
这是生产环境推荐的方式
"""
# 1. 创建可执行的Python脚本
# daily_report.py
import sys
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
filename='daily_report.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main():
logging.info("开始执行每日报告任务")
try:
# 执行任务
generate_report()
send_email()
logging.info("任务执行成功")
except Exception as e:
logging.error(f"任务执行失败:{e}")
sys.exit(1)
def generate_report():
# 生成报告逻辑
pass
def send_email():
# 发送邮件逻辑
pass
if __name__ == '__main__':
main()
# 2. 创建批处理文件 run_report.bat
"""
@echo off
cd /d E:\Python-Programing\scripts
call .venv\Scripts\activate
python daily_report.py
"""
# 3. 使用PowerShell创建任务计划
"""
# 创建任务
$action = New-ScheduledTaskAction -Execute "E:\Python-Programing\scripts\run_report.bat"
$trigger = New-ScheduledTaskTrigger -Daily -At 9am
$principal = New-ScheduledTaskPrincipal -UserId "SYSTEM" -LogonType ServiceAccount
Register-ScheduledTask -TaskName "DailyReport" -Action $action -Trigger $trigger -Principal $principal
# 查看任务
Get-ScheduledTask -TaskName "DailyReport"
# 手动运行
Start-ScheduledTask -TaskName "DailyReport"
# 删除任务
Unregister-ScheduledTask -TaskName "DailyReport" -Confirm:$false
"""
# 4. 使用Python创建任务计划
import subprocess
def create_scheduled_task(task_name, script_path, schedule_time="09:00"):
"""创建Windows计划任务"""
cmd = f'''schtasks /create /tn "{task_name}" /tr "python {script_path}" /sc daily /st {schedule_time} /f'''
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
print(f"任务 {task_name} 创建成功")
else:
print(f"创建失败:{result.stderr}")
def delete_scheduled_task(task_name):
"""删除计划任务"""
cmd = f'schtasks /delete /tn "{task_name}" /f'
subprocess.run(cmd, shell=True)
def run_scheduled_task(task_name):
"""手动运行任务"""
cmd = f'schtasks /run /tn "{task_name}"'
subprocess.run(cmd, shell=True)
6. 自动化脚本最佳实践
"""
自动化脚本模板
"""
import sys
import os
import logging
import argparse
from pathlib import Path
from datetime import datetime
# 添加项目路径
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
class AutomationScript:
"""自动化脚本基类"""
def __init__(self, name: str):
self.name = name
self.start_time = None
self.setup_logging()
def setup_logging(self):
"""配置日志"""
log_dir = PROJECT_ROOT / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f'{self.name}_{datetime.now():%Y%m%d}.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(self.name)
def run(self):
"""执行脚本"""
self.start_time = datetime.now()
self.logger.info(f"{'='*50}")
self.logger.info(f"脚本开始执行:{self.name}")
try:
result = self.execute()
self.on_success(result)
except Exception as e:
self.on_error(e)
raise
finally:
self.on_complete()
def execute(self):
"""执行逻辑(子类实现)"""
raise NotImplementedError
def on_success(self, result):
"""成功回调"""
self.logger.info(f"执行成功:{result}")
def on_error(self, error):
"""错误回调"""
self.logger.error(f"执行失败:{error}", exc_info=True)
def on_complete(self):
"""完成回调"""
duration = datetime.now() - self.start_time
self.logger.info(f"执行完成,耗时:{duration}")
self.logger.info(f"{'='*50}")
# 使用示例
class DailyReportScript(AutomationScript):
"""每日报告脚本"""
def __init__(self):
super().__init__('daily_report')
def execute(self):
self.logger.info("生成报告...")
# 生成报告逻辑
self.logger.info("发送邮件...")
# 发送邮件逻辑
return "报告已发送"
if __name__ == '__main__':
script = DailyReportScript()
script.run()
7. 日志记录
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path
def setup_logger(name, log_dir='logs', level=logging.INFO):
"""配置日志器"""
log_path = Path(log_dir)
log_path.mkdir(exist_ok=True)
logger = logging.getLogger(name)
logger.setLevel(level)
# 格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件处理器(按大小轮转)
file_handler = RotatingFileHandler(
log_path / f'{name}.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 按时间轮转
time_handler = TimedRotatingFileHandler(
log_path / f'{name}_daily.log',
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
time_handler.setLevel(logging.INFO)
time_handler.setFormatter(formatter)
logger.addHandler(time_handler)
return logger
# 使用
logger = setup_logger('my_script')
logger.info("脚本启动")
logger.debug("调试信息")
logger.warning("警告信息")
logger.error("错误信息")
# 记录异常
try:
1 / 0
except Exception:
logger.exception("发生异常")
8. 配置管理
import json
import yaml # pip install pyyaml
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
# JSON配置
def load_json_config(config_file):
"""加载JSON配置"""
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
# YAML配置
def load_yaml_config(config_file):
"""加载YAML配置"""
with open(config_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
# 配置类
@dataclass
class EmailConfig:
smtp_server: str
smtp_port: int
sender: str
password: str
@dataclass
class DatabaseConfig:
host: str
port: int
database: str
username: str
password: str
@dataclass
class AppConfig:
email: EmailConfig
database: DatabaseConfig
debug: bool = False
log_level: str = 'INFO'
def load_config(config_file: str) -> AppConfig:
"""加载配置"""
data = load_yaml_config(config_file)
return AppConfig(
email=EmailConfig(**data['email']),
database=DatabaseConfig(**data['database']),
debug=data.get('debug', False),
log_level=data.get('log_level', 'INFO')
)
# config.yaml 示例
"""
email:
smtp_server: smtp.qq.com
smtp_port: 465
sender: [email protected]
password: your_auth_code
database:
host: localhost
port: 3306
database: mydb
username: root
password: password
debug: false
log_level: INFO
"""
# 环境变量配置
import os
class EnvConfig:
"""从环境变量读取配置"""
SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.qq.com')
SMTP_PORT = int(os.getenv('SMTP_PORT', '465'))
SENDER_EMAIL = os.getenv('SENDER_EMAIL')
SENDER_PASSWORD = os.getenv('SENDER_PASSWORD')
@classmethod
def validate(cls):
"""验证必要配置"""
required = ['SENDER_EMAIL', 'SENDER_PASSWORD']
missing = [k for k in required if not getattr(cls, k)]
if missing:
raise ValueError(f"缺少必要配置:{missing}")
9. 错误处理与重试
import time
import functools
from typing import Callable, Type, Tuple
def retry(
max_attempts: int = 3,
delay: float = 1,
backoff: float = 2,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""
重试装饰器
Args:
max_attempts: 最大尝试次数
delay: 初始延迟(秒)
backoff: 延迟倍数
exceptions: 需要重试的异常类型
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts:
print(f"尝试 {attempt}/{max_attempts} 失败:{e}")
print(f"等待 {current_delay:.1f} 秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
else:
print(f"所有尝试都失败了")
raise last_exception
return wrapper
return decorator
# 使用
@retry(max_attempts=3, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
"""获取数据(可能失败)"""
import random
if random.random() < 0.7:
raise ConnectionError("连接失败")
return "数据"
# 带回调的重试
def retry_with_callback(
func: Callable,
max_attempts: int = 3,
on_retry: Callable = None,
on_failure: Callable = None
):
"""带回调的重试"""
for attempt in range(1, max_attempts + 1):
try:
return func()
except Exception as e:
if attempt < max_attempts:
if on_retry:
on_retry(attempt, e)
time.sleep(1)
else:
if on_failure:
on_failure(e)
raise
# 使用
def on_retry(attempt, error):
print(f"第{attempt}次尝试失败:{error}")
def on_failure(error):
print(f"最终失败:{error}")
# 发送告警邮件等
# retry_with_callback(fetch_data, on_retry=on_retry, on_failure=on_failure)
10. 实战案例
案例:数据同步自动化脚本
"""
实战案例:数据同步自动化脚本
定时从数据源获取数据,处理后保存并发送报告
"""
import logging
import schedule
import time
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional
import json
# 配置
@dataclass
class SyncConfig:
source_path: str
output_path: str
report_recipients: List[str]
sync_interval_minutes: int = 30
# 数据同步器
class DataSyncer:
"""数据同步器"""
def __init__(self, config: SyncConfig):
self.config = config
self.logger = self._setup_logger()
self.last_sync_time: Optional[datetime] = None
self.sync_count = 0
self.error_count = 0
def _setup_logger(self):
"""配置日志"""
logger = logging.getLogger('DataSyncer')
logger.setLevel(logging.INFO)
# 文件处理器
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
handler = logging.FileHandler(
log_dir / 'sync.log',
encoding='utf-8'
)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
# 控制台
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(console)
return logger
def sync(self):
"""执行同步"""
self.logger.info("="*50)
self.logger.info("开始数据同步...")
start_time = datetime.now()
try:
# 1. 获取数据
data = self._fetch_data()
self.logger.info(f"获取到 {len(data)} 条数据")
# 2. 处理数据
processed = self._process_data(data)
self.logger.info(f"处理完成 {len(processed)} 条数据")
# 3. 保存数据
self._save_data(processed)
self.logger.info("数据保存成功")
# 4. 更新状态
self.last_sync_time = datetime.now()
self.sync_count += 1
duration = datetime.now() - start_time
self.logger.info(f"同步完成,耗时:{duration}")
return True
except Exception as e:
self.error_count += 1
self.logger.error(f"同步失败:{e}", exc_info=True)
return False
def _fetch_data(self) -> List[dict]:
"""获取数据"""
source = Path(self.config.source_path)
if not source.exists():
raise FileNotFoundError(f"数据源不存在:{source}")
with open(source, 'r', encoding='utf-8') as f:
return json.load(f)
def _process_data(self, data: List[dict]) -> List[dict]:
"""处理数据"""
processed = []
for item in data:
# 数据清洗和转换
processed_item = {
'id': item.get('id'),
'name': str(item.get('name', '')).strip(),
'value': float(item.get('value', 0)),
'sync_time': datetime.now().isoformat()
}
processed.append(processed_item)
return processed
def _save_data(self, data: List[dict]):
"""保存数据"""
output = Path(self.config.output_path)
output.parent.mkdir(parents=True, exist_ok=True)
with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_status(self) -> dict:
"""获取状态"""
return {
'last_sync': self.last_sync_time.isoformat() if self.last_sync_time else None,
'sync_count': self.sync_count,
'error_count': self.error_count,
'running': True
}
def send_daily_report(self):
"""发送每日报告"""
self.logger.info("生成每日报告...")
report = f"""
数据同步每日报告
================
日期:{datetime.now():%Y-%m-%d}
同步次数:{self.sync_count}
错误次数:{self.error_count}
最后同步:{self.last_sync_time}
"""
self.logger.info(report)
# 这里可以调用邮件发送函数
# send_email(self.config.report_recipients, "每日同步报告", report)
def run(self):
"""运行调度器"""
self.logger.info("数据同步服务启动")
# 立即执行一次
self.sync()
# 定时同步
schedule.every(self.config.sync_interval_minutes).minutes.do(self.sync)
# 每日报告
schedule.every().day.at("18:00").do(self.send_daily_report)
self.logger.info(f"调度配置:每{self.config.sync_interval_minutes}分钟同步一次")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("服务停止")
# 主程序
if __name__ == '__main__':
config = SyncConfig(
source_path='data/source.json',
output_path='data/output.json',
report_recipients=['[email protected]'],
sync_interval_minutes=30
)
syncer = DataSyncer(config)
syncer.run()
11. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| schedule | 轻量级,语法友好,适合简单任务 |
| APScheduler | 功能强大,支持持久化和多种触发器 |
| Windows任务计划 | 生产环境推荐,系统级可靠 |
| 日志记录 | 使用logging模块,配置文件轮转 |
| 配置管理 | YAML/JSON配置文件,环境变量 |
| 错误处理 | 重试机制,异常回调 |
✅ 学习检查清单
- 能使用schedule创建定时任务
- 了解APScheduler的使用
- 能配置Windows任务计划
- 能编写健壮的自动化脚本
- 掌握日志记录最佳实践
📖 下一步学习
办公自动化部分学习完成!接下来进入FPGA开发辅助应用部分:
常见问题 FAQ
💬 schedule和APScheduler怎么选?
简单的定时任务用schedule,几行代码搞定。需要持久化、cron表达式、任务管理的用APScheduler。生产环境推荐APScheduler+Windows任务计划双保险。
💬 脚本挂了怎么自动重启?
Windows任务计划程序设置”如果任务失败则重新启动”。也可以用try-except包住主循环,捕获异常后记录日志并继续运行。
📘 系列导航
- 上一篇:20 - Python Socket通信
- 当前:21 - Python定时任务与自动化脚本
- 下一篇:22 - Python串口通信