目录
前置:
1 本系列将以 “PyQt6实例_批量下载pdf工具”开头,放在 【PyQt6实例】 专栏
2 本系列涉及到的PyQt6知识点:
线程池:QThreadPool,QRunnable;
信号与槽:pyqtSignal,pyqtSlot;
界面:QTextEdit,QLabel,QLineText,QPushButton,QMainWindow,QWidget;
布局:QHBoxLayout,QVBoxLayout;
弹框:QFileDialog,QMessageBox。
3 本系列后续会在B站录制视频,到时会在文末贴出链接。本人还是建议先看博文,不懂的再看视频,这样效率高,节约时间。
代码:
basedir = os.path.dirname(__file__)
class WorkerSignals(QObject):
finished = pyqtSignal(int)
error = pyqtSignal(tuple)
result = pyqtSignal(tuple)
pass
class Worker(QRunnable):
def __init__(self,thread_num:int,task_data:dict):
super().__init__()
self.thread_num = thread_num
self.task_data = task_data
self.signals = WorkerSignals()
self.is_stop = False
pass
@pyqtSlot()
def run(self):
try:
'''
每一个线程领一部分股票,执行下载任务
1 下载完一个股票,记录这个股票
2 当下载某个股票,下载一个pdf,记录pdfurl
断点信息 temp_dict {ticker:[pdfurl,pdfurl,...]}
股票列表 txt_list
txt所在目录 txt_dir
pdf存储位置 pdf_dir
'''
result = None
if self.task_data['temp_dict'] is not None:
temp_ticker_list = list(self.task_data['temp_dict'].keys())
pass
else:
temp_ticker_list = []
txt_list = self.task_data['txt_list']
txt_dir = self.task_data['txt_dir'] + os.path.sep
pdf_dir = self.task_data['pdf_dir'] + os.path.sep
executed_ticker_list = []
for txt_file in txt_list:
ticker = txt_file[0:6]
txt_file_path = txt_dir + txt_file
one_ticker_executed_url_list = []
with open(txt_file_path,'r',encoding='utf-8') as fr:
res_str = fr.read()
url_list = res_str.split('\n')
if ticker in temp_ticker_list:
one_ticker_executed_url_list = self.task_data['temp_dict'][ticker]
for one_url in one_ticker_executed_url_list:
url_list.remove(one_url)
tar_pdf_dir = pdf_dir + ticker + os.path.sep
if not os.path.exists(tar_pdf_dir):
os.mkdir(tar_pdf_dir)
for one_url in url_list:
if self.is_stop:
# 强制停止退出
result = (self.thread_num,'stoped',ticker,one_ticker_executed_url_list,executed_ticker_list)
break
pass
tar_file_name00 = one_url.split('/')
tar_file_name = f"{tar_file_name00[-2]}_{tar_file_name00[-1]}.pdf"
try:
res = requests.get(one_url)
if res.status_code == 200:
with open(tar_pdf_dir + tar_file_name,'wb') as fw:
fw.write(res.content)
pass
pass
except:
pass
one_ticker_executed_url_list.append(one_url)
pass
if self.is_stop:
if result is None:
result = (self.thread_num,'stoped',None,None,executed_ticker_list)
break
executed_ticker_list.append(ticker)
if not self.is_stop:
result = (self.thread_num,'success',None,None,executed_ticker_list)
pass
except Exception:
traceback.print_exc()
exctype,value = sys.exc_info()[:2]
self.signals.error.emit((self.thread_num,exctype,value,traceback.format_exc()))
pass
else:
self.signals.result.emit(result)
pass
finally:
self.signals.finished.emit(self.thread_num)
def stop_run(self):
self.is_stop = True
pass
pass
视频:
https://www.bilibili.com/video/BV1k8ZcYsEFG/
https://www.bilibili.com/video/BV1wtZcYoEny/
https://www.bilibili.com/video/BV1HtZcYdECV/
https://www.bilibili.com/video/BV1ptZcYdEgL/