【Python】日期计算和自动化运行脚本

发布于:2025-05-30 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、日期计算

1. datetime模块介绍

        Python 的 datetime 模块是处理日期和时间的核心模块,提供了多种类和工具,用于操作、格式化和计算日期时间。

object
    timedelta     # 主要用于计算时间跨度
    tzinfo        # 时区相关
    time          # 只关注时间
    date          # 只关注日期
    datetime      # 同时有时间和日期

datetime 模块和 time 模块的区别在于time更接近系统底层,可以理解为 datetime 基于 time 进行了封装,提供了更多实用的函数。 

2. 基础使用

        查询当前时间,并获取时间的每个部分(年月日,时分秒)。

from datetime import datetime, timedelta

# 1. 获取当前时间  /  创建自定义时间
now = datetime.now()
print("当前完整时间:", now)  # 2024-05-20 15:30:45.123456

dt = datetime(2024, 5, 20, 15, 30, 0)  # 年, 月, 日, 时, 分, 秒
print(dt)                   # 2024-05-20 14:30:00


# 2. 访问时间属性 (返回值为int类型)
print("\n时间属性分解:")
print("年份:", now.year)          # 2024
print("月份:", now.month)         # 5
print("日期:", now.day)           # 20
print("小时:", now.hour)          # 15
print("分钟:", now.minute)        # 30
print("秒数:", now.second)        # 45
print("微秒:", now.microsecond)   # 123456


# 3. 转换为 date 和 time 对象
date_part = now.date()
time_part = now.time()
print("\n日期部分:", date_part)  # 2024-05-20
print("时间部分:", time_part)  # 15:30:45.123456


# 3. date 和 time 对象转换为 datetime 对象
new_dt = datetime.combine(date_part, time_part)  
print("\n完整时间:", new_dt)        # 2024-05-20 15:30:45.123456

 3. 时间计算

         具体时间的修改,时间大小的比较(直接使用<, >, ==等运算符直接比较 datetime 对象),时间的差值计算。

from datetime import datetime, timedelta

now = datetime.now()

# 1. 使用 replace 修改时间
new_time = now.replace(year=2025, hour=8)
print("\n修改后的时间:", new_time)  # 2025-05-20 08:30:45.123456

# 2. 时间比较
time_1 = datetime(2025,5,20,13,0,0)
time_2 = datetime(2025,5,21,13,0,0)
print(time_1 > time_2)        # False

# 3. 使用timedelta 计算时间差
delta = timedelta(days=7, hours=3)
future = now + delta
past = now - delta
print("\n7天3小时后:", future)
print("7天3小时前:", past)

4. 时间的表示与输出

        时间戳,字符串解析时间,时间的字符串表示。

from datetime import datetime, timedelta

now = datetime.now()

# 1. 时间戳操作
timestamp = now.timestamp()
print("\n当前时间戳:", timestamp)
print("时间戳还原:", datetime.fromtimestamp(timestamp))


# 2. 从字符串解析时间(strptime)
custom_str = "2023-12-25 20:15:30"
parsed_time = datetime.strptime(custom_str, "%Y-%m-%d %H:%M:%S")
print("\n解析后的时间:", parsed_time)


# 3. 格式化输出(strftime)
formatted = now.strftime("%Y年%m月%d日 %H:%M:%S")
print("\n自定义格式:", formatted)  # 2024年05月20日 15:30:45

5. 工作日计算

        核心方法是weekday方法,基于该方法提供了一些工作日的简单计算函数。

def is_workday(date):
    """判断是否为工作日(周一到周五)"""
    # weekday() 返回 0-4=工作日, 5=周六, 6=周日
    return date.weekday() < 5 

def add_workdays(start_day, days):
    """计算指定工作日后的日期(自动跳过周末)"""
    current = start_day
    added_days = 0
    while added_days < days:
        current += timedelta(days=1)  # 每次加1天
        if is_workday(current):
            added_days += 1
    return current 

def count_workdays(start, end):
    """计算两个日期之间的工作日数量(包含开始,不包含结束)"""
    if start > end:
        start, end = end, start

    total_days = (end - start).days
    workdays = 0

    for day in range(total_days):
        current = start + timedelta(days=day)
        if is_workday(current):
            workdays += 1

    return workdays


# 示例:从2024-05-20(周一)计算5个工作日后的日期
target_date = add_workdays(start_date, 5)
print(f"\n{start_date.date()} 的5个工作日后是 {target_date.date()}")  # 2024-05-27


# 示例:计算2024-05-20 到 2024-05-27 之间的工作日数量
date_a = datetime(2024, 5, 20)
date_b = datetime(2024, 5, 27)
print(f"\n{date_a.date()} 到 {date_b.date()} 之间的工作日数: {count_workdays(date_a, date_b)}")  # 5

二、APSchedule 脚本运行自动化

1. 基于Windows 任务管理器的自动化不详细描述,需要可以查看这篇文章:

【python】在Windows中定时执行Python脚本的详细用法教学_windows定时任务执行python-CSDN博客

2. 打包为 .exe 文件运行的方法在不详细描述,需要可以查看这篇文章:

如何在Windows下将Python项目秒变exe安装包:零基础教程,手把手教你_windows下生成python的exe-CSDN博客

 1. 安装

        APScheduler 支持三种调度任务:固定时间间隔,固定时间点,Linux Crontab 命令。同时还支持异步执行、后台执行调度任务。输入以下命令即可安装。

pip install APScheduler

 2. 简单使用

        简单创建一个调度程序步骤只需要三步:
                1. 创建调度器(scheduler)
                2. 添加任务(job)
                3. 根据触发条件运行调度任务。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

# 任务函数
def job(message):
    now = datetime.now().strftime("%H:%M:%S")
    print(f"[任务执行] {message} | 时间: {now}")

# 创建调度器
scheduler = BlockingScheduler()

# 添加任务时直接打印提示
print("[调度器] 添加间隔任务(每2秒执行)")
scheduler.add_job(job, 'interval', seconds=2, args=['间隔任务'])

# 启动调度器
try:
    scheduler.start()
except KeyboardInterrupt:
    print("\n[调度器] 已停止")

  3. 详细讲解

        APSchedule 有四种组件:
                1. 调度器(Scheduler)
                2. 作业存储(Job Store)
                3. 触发器(Trigger)
                4. 执行器(Executor)

3.1 调度器(Scheduler)

        任务调度器属于控制器角色,提供了 7 种调度器(最常用的是前3种)

种类 用法

 BlockingScheduler()

当前进程的主线程中运行,阻塞当前线程。
 BackgroundScheduler() 后台线程中运行,不会阻塞当前线程。
 AsyncIOScheduler()

结合 asyncio模块使用。

 GeventScheduler() 使用 gevent作为IO模型,和 GeventExecutor 配合使用。
 TornadoScheduler() 使用Tornado的IO模型。用 ioloop.add_timeout 完成定时唤醒。
 TwistedScheduler() 配合TwistedExecutor,用 reactor.callLater 完成定时唤醒。
 QtScheduler() Qt 应用需使用QTimer完成定时唤醒。

        它可以配置作业存储器和执行器。例如添加、修改和移除作业都可以在调度器中完成。

方法 功能说明 示例
add_job()   添加新任务 scheduler.add_job(func, 'interval', seconds=5)
remove_job(job_id) 删除任务 scheduler.remove_job('my_job')
pause_job(job_id) 暂停任务 scheduler.pause_job('email_job')
resume_job(job_id) 恢复暂停的任务 scheduler.resume_job('report_job')
modify_job(job_id, **kwargs) 修改任务属性 scheduler.modify_job('sync_job', seconds=10)
get_jobs() 获取所有任务列表 jobs = scheduler.get_jobs()

3.2 触发器(Trigger)

         APScheduler 有三种内置的 trigger:
        (1)date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

def job():
    print("一次性任务触发")

scheduler = BlockingScheduler()

# 示例1:5秒后执行
scheduler.add_job(job, 'date', 
                 run_date=datetime.now() + timedelta(seconds=5))

# 示例2:指定具体时间(2023-12-31 23:59:59)
scheduler.add_job(job, 'date', 
                 run_date='2023-12-31 23:59:59')

scheduler.start()

        (2)interval 为固定时间间隔触发。interval 间隔调度。

def job():
    print("间隔任务触发")

# 每30分钟执行一次
scheduler.add_job(job, 'interval', 
                 minutes=30,
                 start_date='2023-08-15 09:00:00',
                 end_date='2023-08-20 18:00:00')

# weeks, days, hours, minutes, seconds
# start_date:首次触发时间(默认立即开始), end_date:停止触发时间

# 每2小时15分钟执行一次(组合参数)
scheduler.add_job(job, 'interval', 
                 hours=2, minutes=15)

        (3)cron 在特定时间周期性地触发,和Linux crontab格式兼容。

def job():
    print("Cron任务触发")

# 每天8:30执行(传统Cron风格)
scheduler.add_job(job, 'cron', 
                 hour=8, minute=30)

# 每周一至周五的9:00和17:00执行
scheduler.add_job(job, 'cron', 
                 day_of_week='mon-fri',
                 hour='9,17')

# 每15分钟执行一次(类似 interval)
scheduler.add_job(job, 'cron', 
                 minute='*/15')

# 每月最后一天23:59执行
scheduler.add_job(job, 'cron', 
                 day='last', 
                 hour=23, minute=59)

3.3 作业存储器(Job Stores)

        任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。

  • MemoryJobStore
    默认存储器,所有作业保存在内存中,程序终止后丢失。适用于临时任务或测试环境。

  • SQLAlchemyJobStore
    使用关系型数据库(如SQLite、MySQL、PostgreSQL)持久化存储。需指定数据库URL。

  • MongoDBJobStore
    将作业存储在MongoDB中,需指定数据库和集合名。依赖pymongo库。

  • RedisJobStore
    使用Redis存储作业,支持高效的键值存储。依赖redis库。

  • 自定义存储器
    通过继承BaseJobStore实现自定义存储逻辑(如文件系统、云存储)。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.memory import MemoryJobStore

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.db'),
    'memory': MemoryJobStore()
}
scheduler = BlockingScheduler(jobstores=jobstores)

3.4 执行器(Executors)

        负责处理作业的运行,提交指定的可调用对象到一个线程或进程池完成作业。当作业完成时,执行器将会通知调度器。

        关键参数:
                1. max_workers:
控制线程 / 进程池的大小
                2. misfire_grace_time:任务错过触发后仍允许执行的时间(秒)
                3. coalesce:合并多次未执行的任务。

  • ThreadPoolExecutor:使用线程池管理任务。
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
    'default': ThreadPoolExecutor(max_workers=5)  # 最大线程数
}
  • ThreadPoolExecutor:使用线程池管理任务。
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
    'default': ProcessPoolExecutor(max_workers=3)  # 最大进程数
}
  •  AsyncIOExecutor:基于 asyncio 的异步执行。
from apscheduler.executors.asyncio import AsyncIOExecutor
executors = {
    'default': AsyncIOExecutor()
}

网站公告

今日签到

点亮在社区的每一天
去签到