PyQt6实例_批量下载pdf工具_线程池

发布于:2025-03-31 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

前置:

代码:

视频:


前置:

1 本系列将以 “PyQt6实例_批量下载pdf工具”开头,放在 【PyQt6实例】 专栏
2 本系列涉及到的PyQt6知识点:
线程池:QThreadPool,QRunnable;
信号与槽:pyqtSignal,pyqtSlot;
界面:QTextEdit,QLabel,QLineText,QPushButton,QMainWindow,QWidget;
布局:QHBoxLayout,QVBoxLayout;
弹框:QFileDialog,QMessageBox。
3 本系列后续会在B站录制视频,到时会在文末贴出链接。本人还是建议先看博文,不懂的再看视频,这样效率高,节约时间。

代码:

basedir = os.path.dirname(__file__)

class WorkerSignals(QObject):
    finished = pyqtSignal(int)
    error = pyqtSignal(tuple)
    result = pyqtSignal(tuple)
    pass

class Worker(QRunnable):
    def __init__(self,thread_num:int,task_data:dict):
        super().__init__()
        self.thread_num = thread_num
        self.task_data = task_data
        self.signals = WorkerSignals()
        self.is_stop = False
        pass
    @pyqtSlot()
    def run(self):
        try:
            '''
            每一个线程领一部分股票,执行下载任务
            1 下载完一个股票,记录这个股票
            2 当下载某个股票,下载一个pdf,记录pdfurl
            
            断点信息 temp_dict {ticker:[pdfurl,pdfurl,...]}
            股票列表  txt_list
            txt所在目录  txt_dir
            pdf存储位置  pdf_dir
            '''
            result = None
            if self.task_data['temp_dict'] is not None:
                temp_ticker_list = list(self.task_data['temp_dict'].keys())
                pass
            else:
                temp_ticker_list = []
            txt_list = self.task_data['txt_list']
            txt_dir = self.task_data['txt_dir'] + os.path.sep
            pdf_dir = self.task_data['pdf_dir'] + os.path.sep
            executed_ticker_list = []
            for txt_file in txt_list:
                ticker = txt_file[0:6]
                txt_file_path = txt_dir + txt_file
                one_ticker_executed_url_list = []
                with open(txt_file_path,'r',encoding='utf-8') as fr:
                    res_str = fr.read()
                url_list = res_str.split('\n')
                if ticker in temp_ticker_list:
                    one_ticker_executed_url_list = self.task_data['temp_dict'][ticker]
                    for one_url in one_ticker_executed_url_list:
                        url_list.remove(one_url)
                tar_pdf_dir = pdf_dir + ticker + os.path.sep
                if not os.path.exists(tar_pdf_dir):
                    os.mkdir(tar_pdf_dir)
                for one_url in url_list:
                    if self.is_stop:
                        # 强制停止退出
                        result = (self.thread_num,'stoped',ticker,one_ticker_executed_url_list,executed_ticker_list)
                        break
                        pass
                    tar_file_name00 = one_url.split('/')
                    tar_file_name = f"{tar_file_name00[-2]}_{tar_file_name00[-1]}.pdf"
                    try:
                        res = requests.get(one_url)
                        if res.status_code == 200:
                            with open(tar_pdf_dir + tar_file_name,'wb') as fw:
                                fw.write(res.content)
                            pass
                        pass
                    except:
                        pass
                    one_ticker_executed_url_list.append(one_url)
                    pass
                if self.is_stop:
                    if result is None:
                        result = (self.thread_num,'stoped',None,None,executed_ticker_list)
                    break
                executed_ticker_list.append(ticker)
            if not self.is_stop:
                result = (self.thread_num,'success',None,None,executed_ticker_list)
            pass
        except Exception:
            traceback.print_exc()
            exctype,value = sys.exc_info()[:2]
            self.signals.error.emit((self.thread_num,exctype,value,traceback.format_exc()))
            pass
        else:
            self.signals.result.emit(result)
            pass
        finally:
            self.signals.finished.emit(self.thread_num)

    def stop_run(self):
        self.is_stop = True
        pass
    pass

视频:

https://www.bilibili.com/video/BV1k8ZcYsEFG/
https://www.bilibili.com/video/BV1wtZcYoEny/
https://www.bilibili.com/video/BV1HtZcYdECV/
https://www.bilibili.com/video/BV1ptZcYdEgL/