Veloris.
返回索引
设计实战 2026-02-14

Python定时任务:schedule+Windows任务计划,让脚本自己按时跑

2 分钟
620 words

Python定时任务:schedule+Windows任务计划,让脚本自己按时跑

定时任务是自动化的核心,可以实现定期执行数据处理、发送报告、备份文件等操作。本篇将介绍Python中实现定时任务的多种方式,以及如何编写健壮的自动化脚本。


1. 定时任务方案概览

方案特点适用场景
time.sleep简单,阻塞式简单的循环任务
schedule轻量级,语法友好单进程定时任务
APScheduler功能强大,支持持久化复杂的调度需求
Windows任务计划系统级,可靠生产环境推荐
Celery分布式,支持消息队列大规模任务调度

2. time模块

import time
from datetime import datetime

# 简单的循环定时
def simple_scheduler():
    """简单的定时执行"""
    while True:
        print(f"[{datetime.now()}] 执行任务...")
        do_task()
        time.sleep(60)  # 等待60秒

# 指定时间执行
def run_at_time(target_hour, target_minute, task_func):
    """在指定时间执行任务"""
    while True:
        now = datetime.now()
        target = now.replace(hour=target_hour, minute=target_minute, second=0)
        
        if now >= target:
            # 如果已过目标时间,设为明天
            target = target.replace(day=target.day + 1)
        
        wait_seconds = (target - now).total_seconds()
        print(f"等待 {wait_seconds:.0f} 秒后执行...")
        time.sleep(wait_seconds)
        
        task_func()

# 间隔执行
def interval_runner(interval_seconds, task_func, max_runs=None):
    """按间隔执行任务"""
    run_count = 0
    while max_runs is None or run_count < max_runs:
        start_time = time.time()
        
        try:
            task_func()
        except Exception as e:
            print(f"任务执行错误:{e}")
        
        run_count += 1
        
        # 计算下次执行时间
        elapsed = time.time() - start_time
        sleep_time = max(0, interval_seconds - elapsed)
        time.sleep(sleep_time)

3. schedule库

pip install schedule
import schedule
import time
from datetime import datetime

def job():
    print(f"[{datetime.now()}] 执行任务")

# 各种调度方式
schedule.every(10).seconds.do(job)           # 每10秒
schedule.every(5).minutes.do(job)            # 每5分钟
schedule.every().hour.do(job)                # 每小时
schedule.every().day.at("09:00").do(job)     # 每天9点
schedule.every().monday.do(job)              # 每周一
schedule.every().wednesday.at("13:15").do(job)  # 每周三13:15

# 带参数的任务
def greet(name):
    print(f"Hello, {name}!")

schedule.every().day.at("08:00").do(greet, name="张三")

# 只运行一次
def run_once():
    print("只运行一次")
    return schedule.CancelJob  # 返回这个会取消任务

schedule.every().day.at("10:00").do(run_once)

# 标签管理
schedule.every().hour.do(job).tag('hourly', 'important')
schedule.every().day.do(job).tag('daily')

# 取消特定标签的任务
schedule.clear('hourly')

# 运行调度器
def run_scheduler():
    print("调度器启动...")
    while True:
        schedule.run_pending()
        time.sleep(1)

# run_scheduler()

# 在后台线程运行
import threading

def run_continuously(interval=1):
    """在后台线程运行调度器"""
    cease_continuous_run = threading.Event()
    
    class ScheduleThread(threading.Thread):
        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                schedule.run_pending()
                time.sleep(interval)
    
    continuous_thread = ScheduleThread()
    continuous_thread.start()
    return cease_continuous_run

# 启动后台调度
# stop_run = run_continuously()
# 停止:stop_run.set()

4. APScheduler

pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime

def my_job():
    print(f"[{datetime.now()}] 任务执行")

def job_with_args(name, value):
    print(f"任务:{name} = {value}")

# 阻塞式调度器
def blocking_scheduler_example():
    scheduler = BlockingScheduler()
    
    # 间隔触发
    scheduler.add_job(my_job, 'interval', seconds=30)
    scheduler.add_job(my_job, 'interval', minutes=5)
    scheduler.add_job(my_job, 'interval', hours=1)
    
    # Cron触发(类似Linux crontab)
    scheduler.add_job(my_job, 'cron', hour=9, minute=0)  # 每天9:00
    scheduler.add_job(my_job, 'cron', day_of_week='mon-fri', hour=8)  # 工作日8点
    scheduler.add_job(my_job, 'cron', day=1, hour=0)  # 每月1号0点
    
    # 带参数
    scheduler.add_job(job_with_args, 'interval', seconds=60, 
                      args=['test'], kwargs={'value': 100})
    
    # 指定时间执行一次
    from datetime import datetime, timedelta
    run_time = datetime.now() + timedelta(seconds=10)
    scheduler.add_job(my_job, 'date', run_date=run_time)
    
    try:
        scheduler.start()
    except KeyboardInterrupt:
        scheduler.shutdown()

# 后台调度器
def background_scheduler_example():
    scheduler = BackgroundScheduler()
    
    scheduler.add_job(my_job, 'interval', seconds=10, id='my_job_id')
    scheduler.start()
    
    # 主程序继续执行
    print("调度器在后台运行...")
    
    # 动态管理任务
    # scheduler.pause_job('my_job_id')   # 暂停
    # scheduler.resume_job('my_job_id')  # 恢复
    # scheduler.remove_job('my_job_id')  # 删除
    # scheduler.reschedule_job('my_job_id', trigger='interval', seconds=5)  # 重新调度
    
    return scheduler

# Cron表达式详解
# 秒 分 时 日 月 周 年(年可选)
# * 任意值
# */n 每n个单位
# a-b 范围
# a,b,c 列表

cron_examples = {
    '每分钟': 'cron', {'minute': '*'},
    '每小时': 'cron', {'hour': '*'},
    '每天9点': 'cron', {'hour': 9},
    '工作日9点': 'cron', {'day_of_week': 'mon-fri', 'hour': 9},
    '每月1号': 'cron', {'day': 1},
    '每周一9点': 'cron', {'day_of_week': 'mon', 'hour': 9},
}

# 持久化存储(SQLite)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

def persistent_scheduler():
    jobstores = {
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
    }
    
    scheduler = BackgroundScheduler(jobstores=jobstores)
    scheduler.add_job(my_job, 'interval', minutes=1, id='persistent_job', 
                      replace_existing=True)
    scheduler.start()
    return scheduler

5. Windows任务计划程序

"""
使用Windows任务计划程序运行Python脚本
这是生产环境推荐的方式
"""

# 1. 创建可执行的Python脚本
# daily_report.py
import sys
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    filename='daily_report.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def main():
    logging.info("开始执行每日报告任务")
    try:
        # 执行任务
        generate_report()
        send_email()
        logging.info("任务执行成功")
    except Exception as e:
        logging.error(f"任务执行失败:{e}")
        sys.exit(1)

def generate_report():
    # 生成报告逻辑
    pass

def send_email():
    # 发送邮件逻辑
    pass

if __name__ == '__main__':
    main()

# 2. 创建批处理文件 run_report.bat
"""
@echo off
cd /d E:\Python-Programing\scripts
call .venv\Scripts\activate
python daily_report.py
"""

# 3. 使用PowerShell创建任务计划
"""
# 创建任务
$action = New-ScheduledTaskAction -Execute "E:\Python-Programing\scripts\run_report.bat"
$trigger = New-ScheduledTaskTrigger -Daily -At 9am
$principal = New-ScheduledTaskPrincipal -UserId "SYSTEM" -LogonType ServiceAccount
Register-ScheduledTask -TaskName "DailyReport" -Action $action -Trigger $trigger -Principal $principal

# 查看任务
Get-ScheduledTask -TaskName "DailyReport"

# 手动运行
Start-ScheduledTask -TaskName "DailyReport"

# 删除任务
Unregister-ScheduledTask -TaskName "DailyReport" -Confirm:$false
"""

# 4. 使用Python创建任务计划
import subprocess

def create_scheduled_task(task_name, script_path, schedule_time="09:00"):
    """创建Windows计划任务"""
    cmd = f'''schtasks /create /tn "{task_name}" /tr "python {script_path}" /sc daily /st {schedule_time} /f'''
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        print(f"任务 {task_name} 创建成功")
    else:
        print(f"创建失败:{result.stderr}")

def delete_scheduled_task(task_name):
    """删除计划任务"""
    cmd = f'schtasks /delete /tn "{task_name}" /f'
    subprocess.run(cmd, shell=True)

def run_scheduled_task(task_name):
    """手动运行任务"""
    cmd = f'schtasks /run /tn "{task_name}"'
    subprocess.run(cmd, shell=True)

6. 自动化脚本最佳实践

"""
自动化脚本模板
"""
import sys
import os
import logging
import argparse
from pathlib import Path
from datetime import datetime

# 添加项目路径
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))

class AutomationScript:
    """自动化脚本基类"""
    
    def __init__(self, name: str):
        self.name = name
        self.start_time = None
        self.setup_logging()
    
    def setup_logging(self):
        """配置日志"""
        log_dir = PROJECT_ROOT / 'logs'
        log_dir.mkdir(exist_ok=True)
        
        log_file = log_dir / f'{self.name}_{datetime.now():%Y%m%d}.log'
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file, encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(self.name)
    
    def run(self):
        """执行脚本"""
        self.start_time = datetime.now()
        self.logger.info(f"{'='*50}")
        self.logger.info(f"脚本开始执行:{self.name}")
        
        try:
            result = self.execute()
            self.on_success(result)
        except Exception as e:
            self.on_error(e)
            raise
        finally:
            self.on_complete()
    
    def execute(self):
        """执行逻辑(子类实现)"""
        raise NotImplementedError
    
    def on_success(self, result):
        """成功回调"""
        self.logger.info(f"执行成功:{result}")
    
    def on_error(self, error):
        """错误回调"""
        self.logger.error(f"执行失败:{error}", exc_info=True)
    
    def on_complete(self):
        """完成回调"""
        duration = datetime.now() - self.start_time
        self.logger.info(f"执行完成,耗时:{duration}")
        self.logger.info(f"{'='*50}")

# 使用示例
class DailyReportScript(AutomationScript):
    """每日报告脚本"""
    
    def __init__(self):
        super().__init__('daily_report')
    
    def execute(self):
        self.logger.info("生成报告...")
        # 生成报告逻辑
        
        self.logger.info("发送邮件...")
        # 发送邮件逻辑
        
        return "报告已发送"

if __name__ == '__main__':
    script = DailyReportScript()
    script.run()

7. 日志记录

import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path

def setup_logger(name, log_dir='logs', level=logging.INFO):
    """配置日志器"""
    log_path = Path(log_dir)
    log_path.mkdir(exist_ok=True)
    
    logger = logging.getLogger(name)
    logger.setLevel(level)
    
    # 格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器(按大小轮转)
    file_handler = RotatingFileHandler(
        log_path / f'{name}.log',
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    # 按时间轮转
    time_handler = TimedRotatingFileHandler(
        log_path / f'{name}_daily.log',
        when='midnight',
        interval=1,
        backupCount=30,
        encoding='utf-8'
    )
    time_handler.setLevel(logging.INFO)
    time_handler.setFormatter(formatter)
    logger.addHandler(time_handler)
    
    return logger

# 使用
logger = setup_logger('my_script')
logger.info("脚本启动")
logger.debug("调试信息")
logger.warning("警告信息")
logger.error("错误信息")

# 记录异常
try:
    1 / 0
except Exception:
    logger.exception("发生异常")

8. 配置管理

import json
import yaml  # pip install pyyaml
from pathlib import Path
from dataclasses import dataclass
from typing import Optional

# JSON配置
def load_json_config(config_file):
    """加载JSON配置"""
    with open(config_file, 'r', encoding='utf-8') as f:
        return json.load(f)

# YAML配置
def load_yaml_config(config_file):
    """加载YAML配置"""
    with open(config_file, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f)

# 配置类
@dataclass
class EmailConfig:
    smtp_server: str
    smtp_port: int
    sender: str
    password: str
    
@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    username: str
    password: str

@dataclass
class AppConfig:
    email: EmailConfig
    database: DatabaseConfig
    debug: bool = False
    log_level: str = 'INFO'

def load_config(config_file: str) -> AppConfig:
    """加载配置"""
    data = load_yaml_config(config_file)
    
    return AppConfig(
        email=EmailConfig(**data['email']),
        database=DatabaseConfig(**data['database']),
        debug=data.get('debug', False),
        log_level=data.get('log_level', 'INFO')
    )

# config.yaml 示例
"""
email:
  smtp_server: smtp.qq.com
  smtp_port: 465
  sender: [email protected]
  password: your_auth_code

database:
  host: localhost
  port: 3306
  database: mydb
  username: root
  password: password

debug: false
log_level: INFO
"""

# 环境变量配置
import os

class EnvConfig:
    """从环境变量读取配置"""
    
    SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.qq.com')
    SMTP_PORT = int(os.getenv('SMTP_PORT', '465'))
    SENDER_EMAIL = os.getenv('SENDER_EMAIL')
    SENDER_PASSWORD = os.getenv('SENDER_PASSWORD')
    
    @classmethod
    def validate(cls):
        """验证必要配置"""
        required = ['SENDER_EMAIL', 'SENDER_PASSWORD']
        missing = [k for k in required if not getattr(cls, k)]
        if missing:
            raise ValueError(f"缺少必要配置:{missing}")

9. 错误处理与重试

import time
import functools
from typing import Callable, Type, Tuple

def retry(
    max_attempts: int = 3,
    delay: float = 1,
    backoff: float = 2,
    exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """
    重试装饰器
    
    Args:
        max_attempts: 最大尝试次数
        delay: 初始延迟(秒)
        backoff: 延迟倍数
        exceptions: 需要重试的异常类型
    """
    def decorator(func: Callable):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = delay
            last_exception = None
            
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_attempts:
                        print(f"尝试 {attempt}/{max_attempts} 失败:{e}")
                        print(f"等待 {current_delay:.1f} 秒后重试...")
                        time.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        print(f"所有尝试都失败了")
            
            raise last_exception
        
        return wrapper
    return decorator

# 使用
@retry(max_attempts=3, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
    """获取数据(可能失败)"""
    import random
    if random.random() < 0.7:
        raise ConnectionError("连接失败")
    return "数据"

# 带回调的重试
def retry_with_callback(
    func: Callable,
    max_attempts: int = 3,
    on_retry: Callable = None,
    on_failure: Callable = None
):
    """带回调的重试"""
    for attempt in range(1, max_attempts + 1):
        try:
            return func()
        except Exception as e:
            if attempt < max_attempts:
                if on_retry:
                    on_retry(attempt, e)
                time.sleep(1)
            else:
                if on_failure:
                    on_failure(e)
                raise

# 使用
def on_retry(attempt, error):
    print(f"第{attempt}次尝试失败:{error}")

def on_failure(error):
    print(f"最终失败:{error}")
    # 发送告警邮件等

# retry_with_callback(fetch_data, on_retry=on_retry, on_failure=on_failure)

10. 实战案例

案例:数据同步自动化脚本

"""
实战案例:数据同步自动化脚本
定时从数据源获取数据,处理后保存并发送报告
"""
import logging
import schedule
import time
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional
import json

# 配置
@dataclass
class SyncConfig:
    source_path: str
    output_path: str
    report_recipients: List[str]
    sync_interval_minutes: int = 30

# 数据同步器
class DataSyncer:
    """数据同步器"""
    
    def __init__(self, config: SyncConfig):
        self.config = config
        self.logger = self._setup_logger()
        self.last_sync_time: Optional[datetime] = None
        self.sync_count = 0
        self.error_count = 0
    
    def _setup_logger(self):
        """配置日志"""
        logger = logging.getLogger('DataSyncer')
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        log_dir = Path('logs')
        log_dir.mkdir(exist_ok=True)
        
        handler = logging.FileHandler(
            log_dir / 'sync.log',
            encoding='utf-8'
        )
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        logger.addHandler(handler)
        
        # 控制台
        console = logging.StreamHandler()
        console.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        logger.addHandler(console)
        
        return logger
    
    def sync(self):
        """执行同步"""
        self.logger.info("="*50)
        self.logger.info("开始数据同步...")
        start_time = datetime.now()
        
        try:
            # 1. 获取数据
            data = self._fetch_data()
            self.logger.info(f"获取到 {len(data)} 条数据")
            
            # 2. 处理数据
            processed = self._process_data(data)
            self.logger.info(f"处理完成 {len(processed)} 条数据")
            
            # 3. 保存数据
            self._save_data(processed)
            self.logger.info("数据保存成功")
            
            # 4. 更新状态
            self.last_sync_time = datetime.now()
            self.sync_count += 1
            
            duration = datetime.now() - start_time
            self.logger.info(f"同步完成,耗时:{duration}")
            
            return True
            
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"同步失败:{e}", exc_info=True)
            return False
    
    def _fetch_data(self) -> List[dict]:
        """获取数据"""
        source = Path(self.config.source_path)
        if not source.exists():
            raise FileNotFoundError(f"数据源不存在:{source}")
        
        with open(source, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def _process_data(self, data: List[dict]) -> List[dict]:
        """处理数据"""
        processed = []
        for item in data:
            # 数据清洗和转换
            processed_item = {
                'id': item.get('id'),
                'name': str(item.get('name', '')).strip(),
                'value': float(item.get('value', 0)),
                'sync_time': datetime.now().isoformat()
            }
            processed.append(processed_item)
        return processed
    
    def _save_data(self, data: List[dict]):
        """保存数据"""
        output = Path(self.config.output_path)
        output.parent.mkdir(parents=True, exist_ok=True)
        
        with open(output, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    def get_status(self) -> dict:
        """获取状态"""
        return {
            'last_sync': self.last_sync_time.isoformat() if self.last_sync_time else None,
            'sync_count': self.sync_count,
            'error_count': self.error_count,
            'running': True
        }
    
    def send_daily_report(self):
        """发送每日报告"""
        self.logger.info("生成每日报告...")
        
        report = f"""
数据同步每日报告
================
日期:{datetime.now():%Y-%m-%d}
同步次数:{self.sync_count}
错误次数:{self.error_count}
最后同步:{self.last_sync_time}
        """
        
        self.logger.info(report)
        # 这里可以调用邮件发送函数
        # send_email(self.config.report_recipients, "每日同步报告", report)
    
    def run(self):
        """运行调度器"""
        self.logger.info("数据同步服务启动")
        
        # 立即执行一次
        self.sync()
        
        # 定时同步
        schedule.every(self.config.sync_interval_minutes).minutes.do(self.sync)
        
        # 每日报告
        schedule.every().day.at("18:00").do(self.send_daily_report)
        
        self.logger.info(f"调度配置:每{self.config.sync_interval_minutes}分钟同步一次")
        
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            self.logger.info("服务停止")

# 主程序
if __name__ == '__main__':
    config = SyncConfig(
        source_path='data/source.json',
        output_path='data/output.json',
        report_recipients=['[email protected]'],
        sync_interval_minutes=30
    )
    
    syncer = DataSyncer(config)
    syncer.run()

11. 总结

🔑 核心要点

知识点要点
schedule轻量级,语法友好,适合简单任务
APScheduler功能强大,支持持久化和多种触发器
Windows任务计划生产环境推荐,系统级可靠
日志记录使用logging模块,配置文件轮转
配置管理YAML/JSON配置文件,环境变量
错误处理重试机制,异常回调

✅ 学习检查清单

  • 能使用schedule创建定时任务
  • 了解APScheduler的使用
  • 能配置Windows任务计划
  • 能编写健壮的自动化脚本
  • 掌握日志记录最佳实践

📖 下一步学习

办公自动化部分学习完成!接下来进入FPGA开发辅助应用部分:


常见问题 FAQ

💬 schedule和APScheduler怎么选?

简单的定时任务用schedule,几行代码搞定。需要持久化、cron表达式、任务管理的用APScheduler。生产环境推荐APScheduler+Windows任务计划双保险。

💬 脚本挂了怎么自动重启?

Windows任务计划程序设置”如果任务失败则重新启动”。也可以用try-except包住主循环,捕获异常后记录日志并继续运行。


📘 系列导航

End of file.