主逻辑代码
# -*- coding: utf-8 -*-
# import apscheduler
import pandas as pd
import os,copy,json
from datetime import datetime
from loguru import logger
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.util import undefined
# cron定时调度(某一定时时刻执行)
# (int|str) 表示参数既可以是int类型,也可以是str类型
# (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
# year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
# month (int|str) – month (1-12) -(表示取值范围为1-12月)
# day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
# week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
# day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
# hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
# minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
# second (int|str) – second (0-59) - (表示取值范围为0-59秒)
# start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
# end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
# interval 间隔调度(每隔多久执行)
# weeks (int) – number of weeks to wait
# days (int) – number of days to wait
# hours (int) – number of hours to wait
# minutes (int) – number of minutes to wait
# seconds (int) – number of seconds to wait
# start_date (datetime|str) – starting point for the interval calculation
# end_date (datetime|str) – latest possible date/time to trigger on
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
# date 定时调度(作业只会执行一次)
# run_date (datetime|str) – the date/time to run the job at -(任务开始的时间)
# timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
def QhRR(QhVALUE,QhVALUE1):
print("{}--{}执行定时任务...".format(QhVALUE,QhVALUE1))
class QhApscheduler():
QhFiled = {
"QhJabId": "None", # 任务ID
"QhJabName": "None", # 任务名称
"QhJabFuncName": "None", # 程序名称 执行的程序名
"QhJabFuncMiaoShu": "None", # 程序描述
"QhTimesType": "None", # 任务类型 重复 间隔 一次
"QhYear": "None", # 年
"QhMonth": "None", # 月
"QhDay": "None", # 日
"QhWeek": "None", # 周
"QhDayOfWeek": "None", # 星期几
"QhHour": "None", # 小时
"QhMinute": "None", # 分钟
"QhSecond": "None", # 秒钟
"QhJabArgs": "None", # 任务参数 函数参数
"QhJabKwargs": "None", # 任务参数 函数参数
"QhJabStartDate": "None", # 开始时间
"QhJabNextRunTime": "None", # 下次运行时间
"QhJabLastRunTime": "None", # 上次运行时间
"QhJabStatus": "None" # 任务状态
}
def __init__(self,
QhThreadCount=20,
QhProcessCount = 10,
*args, **kwargs):
self.QhCheduPath = os.path.dirname(os.path.abspath(__file__)) #绝对路径 用于获取定时模块的根目录
logger.info("【初始化01】:QhApscheduler的绝对路径:[{}]".format(self.QhCheduPath))
logger.info("【初始化02】:初始化存储位置&初始化任务池文件,QueHui!")
self.QhCheduPoolJobDf = self._QhinitJobPoolCsv()
logger.info("【初始化03】:设置任务线程({})和进程({})的数量,QueHui!".format(str(QhThreadCount),str(QhProcessCount)))
QhExecutors = {
'default':ThreadPoolExecutor(QhThreadCount),
'processpool':ProcessPoolExecutor(QhProcessCount)
}
logger.info("【初始化04】:设置定时任务序列数据库,QueHui!")
self.QhJobstores = {
# 'default': SQLAlchemyJobStore(url='sqlite:///QhJobPoolDb/QhJabSqlite.sqlite')
'default': SQLAlchemyJobStore(url='sqlite:///{}/QhJobPoolDb/QhJabSqlite.sqlite'.format(self.QhCheduPath)) # 可以给绝对路径
}
self.Qhscheduler = BackgroundScheduler(jobstores=self.QhJobstores,executors=QhExecutors,misfire_grace_time=3,coalescing=True)
logger.info("【初始化05】:创建定时任务对象({}),QueHui!".format(self.Qhscheduler))
logger.info("【初始化06】:启动定时任务对象({}),QueHui!".format(self.Qhscheduler))
self.Qhscheduler.start()
logger.info("【初始化07】:初始化已存在任务池任务,QueHui!")
self._QhInitAddJobFor()
def _QhinitJobPoolCsv(self):
# 初始化定时任务数据库文件夹
# 作者:阙辉
QhCheduPathpd = "{}\QhJobPoolDb".format(self.QhCheduPath)
if os.path.exists(QhCheduPathpd):
logger.info("QhJobPoolDbl路径:[{}] is exist".format(QhCheduPathpd))
# print("{} is exist!".format(QhCheduPathpd))
else:
os.mkdir(QhCheduPathpd)
logger.info("QhJobPoolDbl路径:[{}] is not exist, create it!".format(QhCheduPathpd))
# print("{} is not exist, create it!".format(QhCheduPathpd))
self.QhCheduPoolJobCsv = "{}\QhJobPoolCsv.csv".format(QhCheduPathpd)
if not os.path.exists(self.QhCheduPoolJobCsv):
logger.info("QhJobPoolCsv:[{}] is not exist, create it!".format(self.QhCheduPoolJobCsv))
QhFiled = list(QhApscheduler.QhFiled.keys())
self.QhCheduPoolJobDf = pd.DataFrame(columns=QhFiled)
self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False)
else:
logger.info("QhJobPoolCsv:[{}] is exist , read it!".format(QhCheduPathpd))
self.QhCheduPoolJobDf = pd.read_csv(self.QhCheduPoolJobCsv, encoding='gbk')
self.QhCheduPoolJobDf.fillna("None", inplace=True)
return self.QhCheduPoolJobDf
def _QhInitAddJobFor(self,QhIsFor = True):
# 初始化添加任务
# 作者:阙辉
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount == 0:
logger.info("【初始化任务01】,任务数量为({})直接返回,QueHui!".format(QhJobCount))
return # 如果没有任务就返回
logger.info("【初始化任务02】,任务数量为({}),批量初始化,QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow["QhJabId"] # 任务ID
QhJabName = QhRow["QhJabName"] # 任务名称
QhJabFuncName = QhRow["QhJabFuncName"] # 程序名称 执行的程序名
QhTimesType = QhRow["QhTimesType"] # 任务类型 重复 间隔 一次
QhYear = self.QhGeShiZH(QhRow["QhYear"]) # 年
QhMonth = self.QhGeShiZH(QhRow["QhMonth"]) # 月
QhDay = self.QhGeShiZH(QhRow["QhDay"]) # 日
QhWeek = self.QhGeShiZH(QhRow["QhWeek"]) # 周
QhDayOfWeek = self.QhGeShiZH(QhRow["QhDayOfWeek"]) # 星期几
QhHour = self.QhGeShiZH(QhRow["QhHour"]) # 小时
QhMinute = self.QhGeShiZH(QhRow["QhMinute"]) # 分钟
QhSecond = self.QhGeShiZH(QhRow["QhSecond"]) # 秒钟
QhJabArgs = self.QhGeShiZH(QhRow["QhJabArgs"]) # 参数*Args
QhJabKwargs = self.QhGeShiZH(QhRow["QhJabKwargs"]) # 参数 **Kwargs
QhJabStatus = self.QhGeShiZH(QhRow["QhJabStatus"]) # 任务状态
logger.info("【初始化任务02】\n任务ID:{};\n任务名称:{};\n程序名称:{};\n任务状态:{};\n初始化,QueHui!".format(QhJabId,QhJabName,QhJabFuncName,QhJabStatus))
self.QhAddJob(QhJabId,
QhTimesType=QhTimesType,
QhJabFuncName = QhJabFuncName,
QhJabArgs=QhJabArgs,
QhJabKwargs=QhJabKwargs,
QhName = QhJabName,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhIsFor = QhIsFor,
QhJabStatus=QhJabStatus)
# QhIsFor当为True时,保存到csv文件,批量增加时的场景
QhJobCount = self.QhCheduPoolJobDf.shape[0]
logger.info("【初始化任务03】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor))
if QhIsFor == True:self.QhSavePoolToCsv()
def QhTiggers(self,QhTimesType,
QhYear = None,
QhMonth = None,
QhDay = None,
QhWeek = None,
QhDayOfWeek = None,
QhHour = None,
QhMinute = None,
QhSecond = None,
QhStartDate = None,
QhEndDate = None,
QhTimezone=None,
QhJitter=None
):
if QhTimesType == "重复":
QhTiggers = CronTrigger(year=QhYear,
month=QhMonth,
day=QhDay,
week=QhWeek,
day_of_week=QhDayOfWeek,
hour=QhHour,
minute=QhMinute,
second=QhSecond,
start_date=QhStartDate,
end_date=QhEndDate,
timezone=QhTimezone,
jitter=QhJitter)
elif QhTimesType == "间隔":
QhTiggers = IntervalTrigger(weeks= int(0) if QhWeek == None else QhWeek,
days= 0 if QhDay == None else QhDay,
hours= 0 if QhHour == None else QhHour,
minutes= 0 if QhMinute == None else QhMinute,
seconds= 0 if QhSecond == None else QhSecond,
start_date=QhStartDate,
end_date=QhEndDate,
timezone=QhTimezone,
jitter=QhJitter)
elif QhTimesType == "一次":
QhRunDate = datetime(0 if QhYear == None else QhYear,
0 if QhMonth == None else QhMonth,
0 if QhDay == None else QhDay,
0 if QhHour == None else QhHour,
0 if QhMinute == None else QhMinute,
0 if QhSecond == None else QhSecond)
QhTiggers = DateTrigger(run_date=QhRunDate,
timezone=QhTimezone)
return QhTiggers
def QhAddJob(self,
QhJabId,
QhTimesType, # 重复 间隔 一次
QhJabFuncName,
QhJabArgs=None,
QhJabKwargs=None,
QhName=None,
QhYear = None,
QhMonth = None,
QhDay = None,
QhWeek = None,
QhDayOfWeek = None,
QhHour = None,
QhMinute = None,
QhSecond = None,
QhStartDate = None,
QhEndDate = None,
QhTimezone=None,
QhJitter=None,
misfire_grace_time=undefined,
next_run_time=undefined,
jobstore='default',
executor='default',
coalesce=undefined,
max_instances=undefined,
replace_existing=False,
QhIsFor = False,
QhJabStatus = None,
QhJobIsOpen = False,
):
# 1、判断任务是否存在,存在则直接返回
if self.Qhscheduler.get_job(QhJabId):
logger.info("【任务添加01】{} is exist,return,QueHui!".format(QhJabId))
# print("{} is exist!".format(QhJabId))
return # 如果任务已经存在,则不添加
# 2、如果任务已关闭,则不新建,直接返回
if QhJabStatus == "已关闭" and (not QhJobIsOpen):
logger.info("【任务添加02】{} is close,return,QueHui!".format(QhJabId))
# print("{} 任务已关闭,不用新建!".format(QhJabId))
return
# 3、set触发器
logger.info("【任务添加03】设置触发器({}),QueHui!".format(QhTimesType))
QhTiggers = self.QhTiggers(QhTimesType =QhTimesType,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhStartDate = QhStartDate,
QhEndDate = QhEndDate,
QhTimezone=QhTimezone,
QhJitter=QhJitter)
# 4、组装任务参数
# 如果位置参数Args,和关键字参数Kwargs都不为None,则只取位置参数值
logger.info("【任务添加04】程序参数格式化(Args&Kwargs,QueHui!")
if QhJabArgs!=None:
logger.info("【任务添加04.01】程序参数格式化(Args)-({}),QueHui!".format(QhJabArgs))
QhJabArgs = tuple(QhJabArgs.split("+"))
else:
if QhJabKwargs != None:
logger.info("【任务添加04.01】程序参数格式化(Kwargs)-({}),QueHui!".format(QhJabKwargs))
QhJabKwargs = QhJabKwargs.replace("+",',')
QhJabKwargs = QhJabKwargs.replace("'",'"')
QhJabKwargs = json.loads(QhJabKwargs)
# 5、添加任务
logger.info("【任务添加05】添加程序({})到定时任务,QueHui!".format(QhJabFuncName))
self.Qhscheduler.add_job(func=globals()[QhJabFuncName],
trigger=QhTiggers,
args=QhJabArgs,
kwargs=QhJabKwargs,
id=QhJabId,
name=QhName,
misfire_grace_time=misfire_grace_time,
next_run_time=next_run_time,
jobstore=jobstore,
executor=executor,
coalesce=coalesce,
max_instances=max_instances,
replace_existing=replace_existing,)
# 6、还原参数格式,用于存储
# 函数参数还原输入格式处理 阙辉
# print(QhJabArgs,type(QhJabArgs))
logger.info("【任务添加06】还原参数格式,用于存储,QueHui!")
if isinstance(QhJabArgs, tuple):
QhJabArgs = str(QhJabArgs).replace("'",'').replace("(",'').\
replace(")",'').replace(",","+").replace("+ ","+").replace("+ ","+")
# print(QhJabArgs,type(QhJabArgs))
# print(QhJabKwargs,type(QhJabKwargs))
if isinstance(QhJabKwargs, dict):
QhJabKwargs = str(QhJabKwargs).replace("'",'"').replace(",","+")
# print(QhJabKwargs,type(QhJabKwargs))
QhAddJoblDic = copy.deepcopy(QhApscheduler.QhFiled)
QhAddJoblDic["QhJabId"] = QhJabId
QhAddJoblDic["QhJabName"] = QhName
QhAddJoblDic["QhJabFuncName"] = "None" if QhJabFuncName==None else QhJabFuncName
QhAddJoblDic["QhTimesType"] = QhTimesType
QhAddJoblDic["QhYear"] = "None" if QhYear==None else QhYear
QhAddJoblDic["QhMonth"] = "None" if QhMonth==None else QhMonth
QhAddJoblDic["QhDay"] = "None" if QhDay==None else QhDay
QhAddJoblDic["QhWeek"] = "None" if QhWeek==None else QhWeek
QhAddJoblDic["QhDayOfWeek"] = "None" if QhDayOfWeek==None else QhDayOfWeek
QhAddJoblDic["QhHour"] = "None" if QhHour==None else QhHour
QhAddJoblDic["QhMinute"] = "None" if QhMinute==None else QhMinute
QhAddJoblDic["QhSecond"] = "None" if QhSecond==None else QhSecond
QhAddJoblDic["QhJabArgs"] = "None" if QhJabArgs==None else QhJabArgs
QhAddJoblDic["QhJabKwargs"] = "None" if QhJabKwargs==None else QhJabKwargs
# QhAddJoblDic["QhJabStartDate"] = "None"
# QhAddJoblDic["QhJabNextRunTime"] = "None"
# QhAddJoblDic["QhJabLastRunTime"] = "None"
QhAddJoblDic["QhJabStatus"] = "已运行"
# 7 、
# try: # 任务id存在,则更新任务,不存在则新增
# self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId].index[0]
logger.info("【任务添加07】判断任务id存在,则更新任务,不存在则新增,QueHui!")
if QhJabId in self.QhCheduPoolJobDf["QhJabId"]:
logger.info("【任务添加07.01】{}任务存在,更新任务数据,QueHui!".format(QhJabId))
# 判断任务是否存在
for QhKey,QhValue in QhAddJoblDic.items():
if QhKey == "QhJabId":continue
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId,QhKey] = QhValue
# except:
else:
logger.info("【任务添加07.01】{}任务不存在,新增任务到任务池,QueHui!".format(QhJabId))
QhCheduPoolJobDfRow = pd.DataFrame([QhAddJoblDic])
# print(QhCheduPoolJobDfRow)
try:
self.QhCheduPoolJobDf = self.QhCheduPoolJobDf._append(QhCheduPoolJobDfRow)
except:
self.QhCheduPoolJobDf = self.QhCheduPoolJobDf.append(QhCheduPoolJobDfRow)
# print(self.QhCheduPoolJobDf)
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
if (not QhIsFor):
logger.info("【任务添加08】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
self.QhSavePoolToCsv()
def QhPauseJob(self,QhJabId,QhIsFor=False):
# 暂停任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
# print("任务ID不能空,请输入任务ID")
logger.warning("【暂停任务01】任务ID不能空,请输入任务ID,QueHui!")
return
if self.Qhscheduler.get_job(QhJabId):
logger.info("【暂停任务02】暂停[{}]任务,QueHui!".format(QhJabId))
self.Qhscheduler.pause_job(QhJabId)
# print("暂停任务",QhJabId)
logger.info("【暂停任务03】设置任务[{}]状态--已暂停,QueHui!".format(QhJabId))
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已暂停'
if (not QhIsFor):
logger.info("【暂停任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
self.QhSavePoolToCsv()
# AA = self.Qhscheduler.get_job(QhJabId)
# print(AA.state)
else:
logger.info("【暂停任务05】暂停[{}]任务可能不存在,QueHui!".format(QhJabId))
# print("暂停失败,请检查任务ID,任务可能不存在")
except Exception as e:
logger.error("【暂停任务00】暂停[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
# print("暂停失败,请检查任务ID,任务可能不存在")
def QhPauseJobFor(self,QhIsFor=True):
# 暂停所有任务
# 阙辉
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount ==0:
logger.info("【暂停所有任务01】任务数量为({}),QueHui!".format(QhJobCount))
return
logger.info("【暂停所有任务01】暂停所有任务({}),QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow['QhJabId']
logger.info("【暂停所有任务01.01】暂停任务-{},QueHui!".format(QhJabId))
self.QhPauseJob(QhJabId,QhIsFor=QhIsFor)
logger.info("【暂停所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor))
if QhIsFor == True:self.QhSavePoolToCsv()
def QhResumeJob(self,QhJabId,QhIsFor=False):
# 启动任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
# print("任务ID不能空,请输入任务ID")
logger.warning("【启动任务01】任务ID不能空,请输入任务ID,QueHui!")
return
if self.Qhscheduler.get_job(QhJabId):
logger.info("【启动任务02】启动[{}]任务,QueHui!".format(QhJabId))
self.Qhscheduler.resume_job(QhJabId)
logger.info("【启动任务03】设置任务[{}]状态--已运行,QueHui!".format(QhJabId))
# print("恢复任务",QhJabId)
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已运行'
if (not QhIsFor):
logger.info("【启动任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
self.QhSavePoolToCsv()
else:
# print("恢复失败,请检查任务ID,任务可能不存在")
logger.info("【启动任务05】启动[{}]任务可能不存在,QueHui!".format(QhJabId))
except Exception as e:
# print("恢复失败,请检查任务ID,任务可能不存在")
logger.error("【启动任务00】启动[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
def QhResumeJobFor(self,QhIsFor=True):
# 启动所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount ==0:
logger.info("【启动所有任务01】任务数量为({}),QueHui!".format(QhJobCount))
return
logger.info("【启动所有任务01】启动所有任务({}),QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow['QhJabId']
logger.info("【启动所有任务01.01】启动任务-{},QueHui!".format(QhJabId))
self.QhResumeJob(QhJabId,QhIsFor=QhIsFor)
logger.info("【启动所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor))
if QhIsFor == True:self.QhSavePoolToCsv()
def QhRemoveJob(self,QhJabId,QhIsFor=False):
# 关闭任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
# print("任务ID不能空,请输入任务ID")
logger.warning("【关闭任务01】任务ID不能空,请输入任务ID,QueHui!")
return
if self.Qhscheduler.get_job(QhJabId):
logger.info("【关闭任务02】关闭[{}]任务,QueHui!".format(QhJabId))
self.Qhscheduler.remove_job(QhJabId)
logger.info("【关闭任务03】设置任务[{}]状态--已关闭,QueHui!".format(QhJabId))
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已关闭'
if (not QhIsFor):
logger.info("【关闭任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
self.QhSavePoolToCsv()
else:
logger.info("【关闭任务05】关闭[{}]任务可能不存在,QueHui!".format(QhJabId))
except Exception as e:
logger.error("【关闭任务00】关闭[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
def QhRemoveJobFor(self,QhIsFor=True):
# 关闭所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount ==0:
logger.info("【关闭所有任务01】任务数量为({}),QueHui!".format(QhJobCount))
return
logger.info("【关闭所有任务01】暂停所有任务({}),QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow['QhJabId']
logger.info("【关闭所有任务01.01】关闭任务-{},QueHui!".format(QhJabId))
self.QhRemoveJob(QhJabId,QhIsFor=QhIsFor)
logger.info("【关闭所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor))
if QhIsFor == True:self.QhSavePoolToCsv()
def QhReopenJob(self,QhJabId,QhIsFor=False):
# 重新打开任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
# try:
if QhJabId == "":
# print("任务ID不能空,请输入任务ID")
logger.warning("【打开任务01】任务ID不能空,请输入任务ID,QueHui!")
return
QhJabStatus = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'].values[0])
if not self.Qhscheduler.get_job(QhJabId):
if QhJabStatus == "已关闭":
# QhJabId = QhRow["QhJabId"]
QhJabName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabName'].values[0]
QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
QhTimesType = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhTimesType'].values[0]
QhYear = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhYear'].values[0])
QhMonth = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMonth'].values[0])
QhDay = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDay'].values[0])
QhWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhWeek'].values[0])
QhDayOfWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDayOfWeek'].values[0])
QhHour = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhHour'].values[0])
QhMinute = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMinute'].values[0])
QhSecond = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhSecond'].values[0])
QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
logger.info("【打开任务01】\n任务ID:{};\n任务名称:{};\n程序名称:{};\n任务状态:{};\n初始化,QueHui!".format(QhJabId,QhJabName,QhJabFuncName,QhJabStatus))
self.QhAddJob(QhJabId,
QhTimesType=QhTimesType,
QhJabFuncName = QhJabFuncName,
QhJabArgs=QhJabArgs,
QhJabKwargs=QhJabKwargs,
QhName = QhJabName,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhJabStatus=QhJabStatus,
QhIsFor = QhIsFor,
QhJobIsOpen = True) # 状态已在此函数更新
def QhReopenJobFor(self,QhIsFor=True):
# 重新打开所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount ==0:
logger.info("【打开所有任务01】任务数量为({}),QueHui!".format(QhJobCount))
return
logger.info("【打开所有任务01】启动所有任务({}),QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow['QhJabId']
logger.info("【打开所有任务01.01】打开任务-{},QueHui!".format(QhJabId))
self.QhReopenJob(QhJabId,QhIsFor=QhIsFor)
logger.info("【打开所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor))
if QhIsFor == True:self.QhSavePoolToCsv()
def QhShouDongRunJob(self,QhJabId):
# 手动执行任务
# 阙辉
try:
if QhJabId == "":
# print("任务ID不能空,请输入任务ID")
logger.warning("【手动执行任务01】任务ID不能空,请输入任务ID,QueHui!")
return
# if self.Qhscheduler.get_job(QhJabId):
QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
QhJabFunc = globals().get(QhJabFuncName)
logger.info("【手动执行任务01】\n任务ID:{};\n程序名称:{};\n程序参数Args:{};\n程序参数Kwargs:{};\n初始化,QueHui!".format(QhJabId,QhJabFuncName,QhJabArgs,QhJabKwargs))
if QhJabFunc:
if QhJabArgs!= None:
QhJabArgs = tuple(QhJabArgs.split("+"))
QhJabFunc(*QhJabArgs)
elif QhJabKwargs != None:
QhJabKwargs = QhJabKwargs.replace("+",',')
QhJabKwargs = QhJabKwargs.replace("'",'"')
QhJabKwargs = json.loads(QhJabKwargs)
QhJabFunc(**QhJabKwargs)
logger.info("【手动执行任务02】{}-程序执行完成,QueHui!".format(QhJabFuncName))
else:
# print(f"Function '{QhJabFuncName}' not found.")
logger.warning("【手动执行任务02】{}-程序不存在,QueHui!".format(QhJabFuncName))
# self.Qhscheduler.run_job(QhJabId)
except Exception as e:
logger.error("【手动执行任务00】程序[{}-{}]执行报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,QhJabFuncName,e))
# print(f"An error occurred while executing the job: {e}")
# print("2手动执行任务失败,请检查任务ID,任务可能不存在")
def QhShouDongRunJobFor(self):
# 手动运行所有任务
# 阙辉
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhShouDongRunJob(QhRow['QhJabId'])
QhJobCount = self.QhCheduPoolJobDf.shape[0]
if QhJobCount ==0:
logger.info("【手动执行所有任务01】任务数量为({}),QueHui!".format(QhJobCount))
return
logger.info("【手动执行所有任务01】启动所有任务({}),QueHui!".format(QhJobCount))
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow['QhJabId']
logger.info("【手动执行所有任务01.01】手动执行任务-{},QueHui!".format(QhJabId))
self.QhShouDongRunJob(QhJabId)
def QhXiuGaiJob(self,QhJabId,
QhIsFor=True):
# 修改任务方案:先删除,修改后重新添加
pass
def QhGetJobsState(self):
# 获取任务情况
# 阙辉
for job in self.Qhscheduler.get_jobs():
print(f"Job ID: {job.id}, 任务的下一次运行时间: {self.QhGetNextJobTime(job.id)}")
print(f"Job ID: {job.id}, 任务是否有待执行: {job.pending}")
# print(f"Job ID: {job.id}, 任务是否有待执行: {job.job_state}")
# print(f"Job ID: {job.id}, 任务是否正在运行: {job.running}")
def QhGetNextJobTime(self,QhJabId):
# 任务下一次运行时间
# 阙辉
try:
if QhJabId == "":
logger.warning("【任务下一次运行时间01】任务ID不能空,请输入任务ID,QueHui!")
return
QhJob = self.Qhscheduler.get_job(QhJabId)
if QhJob:
logger.info("【任务下一次运行时间02】[{}]获取下一次运行时间,QueHui!".format(QhJabId))
QhNextRunTime = QhJob.next_run_time
QhNextRunTimeStr = QhNextRunTime.strftime("%Y-%m-%d %H:%M:%S")
return QhNextRunTimeStr
# print(QhNextRunTimeStr)
# print(type(QhNextRunTime))
else:
logger.info("【任务下一次运行时间03】[{}]任务可能不存在,QueHui!".format(QhJabId))
except Exception as e:
logger.error("【任务下一次运行时间00】[{}]获取下一次运行时间报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
def QhGeShiZH(self,QhValue):
# 格式转化 后期需要优化
# 1、如果是字符串"None","",则返回None
# 2、如果是字符串,则返回字符串
# 作者:阙辉
try:
if isinstance(QhValue, str):
# 1、判断是不是字符串
if QhValue in ["None","","0"]:
# 1.1、如果是字符串"None","",则返回None
return None
else:
# 1.2、如果非["None","","0"]:
try:
# 1.2.1、尝试转换为int,转换成功则返回int
return int(QhValue)
except:
# 1.2.2、尝试转换为int,转换失败则返回字符串(原值)
return QhValue
else:
# 2、如果非字符串,尝试转换为int
try:
# 2.1、尝试转换为int,转换成功则返回int
return int(QhValue)
except:
# 2.2、尝试转换为int,转换失败则返回字符串(原值)
return QhValue
except:
# 以上都失败,则返回原值
return QhValue
def QhSavePoolToCsv(self):
# 保存 任务池数据
# 作者:阙辉
try:
self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False,encoding='gbk')
logger.info("【任务保存】,任务保存成功,QueHui!")
except Exception as e:
logger.error("【任务保存】,任务保存失败,QueHui!\n错误消息:\n{}".format(e))
UI代码
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'QhTestJob.ui'
#
# Created by: PyQt5 UI code generator 5.15.7
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_Form(object):
def setupUi(self, Form):
Form.setObjectName("Form")
Form.resize(632, 610)
self.gridLayout = QtWidgets.QGridLayout(Form)
self.gridLayout.setObjectName("gridLayout")
self.verticalLayout = QtWidgets.QVBoxLayout()
self.verticalLayout.setObjectName("verticalLayout")
self.horizontalLayout_5 = QtWidgets.QHBoxLayout()
self.horizontalLayout_5.setObjectName("horizontalLayout_5")
self.pushButton = QtWidgets.QPushButton(Form)
self.pushButton.setObjectName("pushButton")
self.horizontalLayout_5.addWidget(self.pushButton)
self.lineEdit_5 = QtWidgets.QLineEdit(Form)
self.lineEdit_5.setMinimumSize(QtCore.QSize(500, 28))
self.lineEdit_5.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_5.setObjectName("lineEdit_5")
self.horizontalLayout_5.addWidget(self.lineEdit_5)
self.verticalLayout.addLayout(self.horizontalLayout_5)
self.horizontalLayout_4 = QtWidgets.QHBoxLayout()
self.horizontalLayout_4.setObjectName("horizontalLayout_4")
self.pushButton_2 = QtWidgets.QPushButton(Form)
self.pushButton_2.setObjectName("pushButton_2")
self.horizontalLayout_4.addWidget(self.pushButton_2)
self.lineEdit = QtWidgets.QLineEdit(Form)
self.lineEdit.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit.setObjectName("lineEdit")
self.horizontalLayout_4.addWidget(self.lineEdit)
self.verticalLayout.addLayout(self.horizontalLayout_4)
self.horizontalLayout_18 = QtWidgets.QHBoxLayout()
self.horizontalLayout_18.setObjectName("horizontalLayout_18")
self.pushButton_18 = QtWidgets.QPushButton(Form)
self.pushButton_18.setObjectName("pushButton_18")
self.horizontalLayout_18.addWidget(self.pushButton_18)
self.lineEdit_18 = QtWidgets.QLineEdit(Form)
self.lineEdit_18.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit_18.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_18.setObjectName("lineEdit_18")
self.horizontalLayout_18.addWidget(self.lineEdit_18)
self.verticalLayout.addLayout(self.horizontalLayout_18)
self.horizontalLayout_3 = QtWidgets.QHBoxLayout()
self.horizontalLayout_3.setObjectName("horizontalLayout_3")
self.pushButton_3 = QtWidgets.QPushButton(Form)
self.pushButton_3.setObjectName("pushButton_3")
self.horizontalLayout_3.addWidget(self.pushButton_3)
self.lineEdit_2 = QtWidgets.QLineEdit(Form)
self.lineEdit_2.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit_2.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_2.setObjectName("lineEdit_2")
self.horizontalLayout_3.addWidget(self.lineEdit_2)
self.verticalLayout.addLayout(self.horizontalLayout_3)
self.horizontalLayout_6 = QtWidgets.QHBoxLayout()
self.horizontalLayout_6.setObjectName("horizontalLayout_6")
self.pushButton_6 = QtWidgets.QPushButton(Form)
self.pushButton_6.setObjectName("pushButton_6")
self.horizontalLayout_6.addWidget(self.pushButton_6)
self.lineEdit_6 = QtWidgets.QLineEdit(Form)
self.lineEdit_6.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit_6.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_6.setObjectName("lineEdit_6")
self.horizontalLayout_6.addWidget(self.lineEdit_6)
self.verticalLayout.addLayout(self.horizontalLayout_6)
self.horizontalLayout_2 = QtWidgets.QHBoxLayout()
self.horizontalLayout_2.setObjectName("horizontalLayout_2")
self.pushButton_4 = QtWidgets.QPushButton(Form)
self.pushButton_4.setObjectName("pushButton_4")
self.horizontalLayout_2.addWidget(self.pushButton_4)
self.lineEdit_3 = QtWidgets.QLineEdit(Form)
self.lineEdit_3.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit_3.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_3.setObjectName("lineEdit_3")
self.horizontalLayout_2.addWidget(self.lineEdit_3)
self.verticalLayout.addLayout(self.horizontalLayout_2)
self.horizontalLayout = QtWidgets.QHBoxLayout()
self.horizontalLayout.setObjectName("horizontalLayout")
self.pushButton_5 = QtWidgets.QPushButton(Form)
self.pushButton_5.setObjectName("pushButton_5")
self.horizontalLayout.addWidget(self.pushButton_5)
self.lineEdit_4 = QtWidgets.QLineEdit(Form)
self.lineEdit_4.setMinimumSize(QtCore.QSize(230, 28))
self.lineEdit_4.setMaximumSize(QtCore.QSize(500, 16777215))
self.lineEdit_4.setObjectName("lineEdit_4")
self.horizontalLayout.addWidget(self.lineEdit_4)
self.verticalLayout.addLayout(self.horizontalLayout)
self.pushButton_19 = QtWidgets.QPushButton(Form)
self.pushButton_19.setObjectName("pushButton_19")
self.verticalLayout.addWidget(self.pushButton_19)
self.pushButton_24 = QtWidgets.QPushButton(Form)
self.pushButton_24.setObjectName("pushButton_24")
self.verticalLayout.addWidget(self.pushButton_24)
self.pushButton_20 = QtWidgets.QPushButton(Form)
self.pushButton_20.setObjectName("pushButton_20")
self.verticalLayout.addWidget(self.pushButton_20)
self.pushButton_21 = QtWidgets.QPushButton(Form)
self.pushButton_21.setObjectName("pushButton_21")
self.verticalLayout.addWidget(self.pushButton_21)
self.pushButton_22 = QtWidgets.QPushButton(Form)
self.pushButton_22.setObjectName("pushButton_22")
self.verticalLayout.addWidget(self.pushButton_22)
self.pushButton_23 = QtWidgets.QPushButton(Form)
self.pushButton_23.setObjectName("pushButton_23")
self.verticalLayout.addWidget(self.pushButton_23)
self.gridLayout.addLayout(self.verticalLayout, 0, 0, 1, 1)
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("Form", "Form"))
self.pushButton.setText(_translate("Form", "添加任务"))
self.pushButton_2.setText(_translate("Form", "暂停任务"))
self.pushButton_18.setText(_translate("Form", "启动任务"))
self.pushButton_3.setText(_translate("Form", "删除任务"))
self.pushButton_6.setText(_translate("Form", "打开任务"))
self.pushButton_4.setText(_translate("Form", "手动执行"))
self.pushButton_5.setText(_translate("Form", "获取任务状态"))
self.pushButton_19.setText(_translate("Form", "获取所有任务的状态"))
self.pushButton_24.setText(_translate("Form", "手动执行所有任务"))
self.pushButton_20.setText(_translate("Form", "暂停所有任务"))
self.pushButton_21.setText(_translate("Form", "恢复所有任务"))
self.pushButton_22.setText(_translate("Form", "关闭所有任务"))
self.pushButton_23.setText(_translate("Form", "打开所有任务"))