PyQt5 K线图实现与性能优化详解

发布于:2025-09-01 ⋅ 阅读:(20) ⋅ 点赞:(0)

下面我将展示一个完整的PyQt5 K线图实现方案,包含详细说明和多项性能优化技术。

完整示例代码

import sys
import random
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, 
                            QWidget, QPushButton, QComboBox, QLabel)
from PyQt5.QtCore import Qt, QTimer, QRectF
from PyQt5.QtGui import QPainter, QColor, QPen, QFont
import numpy as np
from collections import deque

class KLineItem:
    """
    K线数据项,存储单个K线的所有信息
    """
    def __init__(self, timestamp, open_price, close_price, high_price, low_price, volume):
        self.timestamp = timestamp
        self.open = open_price
        self.close = close_price
        self.high = high_price
        self.low = low_price
        self.volume = volume
        
    def is_rise(self):
        """判断是阳线还是阴线"""
        return self.close >= self.open


class KLineWidget(QWidget):
    """
    自定义K线图绘制控件
    """
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setBackgroundRole(Qt.white)
        self.setAutoFillBackground(True)
        
        # 数据相关
        self.kline_data = deque(maxlen=200)  # 使用双端队列限制最大数据量
        self.current_data = []  # 当前显示的数据
        self.price_min = 0
        self.price_max = 0
        self.volume_max = 0
        
        # 显示设置
        self.candle_width = 8  # K线宽度
        self.margin = 10       # 边距
        self.show_volume = True  # 是否显示成交量
        self.theme = "dark"    # 主题
        
        # 性能优化相关
        self.cache_valid = False
        self.cache_image = None
        self.last_draw_rect = QRectF()
        
        # 模拟数据定时器
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.add_random_data)
        
    def add_data(self, kline_item):
        """
        添加K线数据
        """
        self.kline_data.append(kline_item)
        self.update_price_range()
        self.cache_valid = False  # 数据变化,缓存失效
        self.update()
        
    def add_random_data(self):
        """
        添加随机数据(用于演示)
        """
        if not self.kline_data:
            # 初始数据
            for _ in range(50):
                self.add_random_data()
            return
            
        last = self.kline_data[-1]
        # 基于最后一条数据生成新数据
        change_percent = random.uniform(-0.03, 0.03)
        new_close = last.close * (1 + change_percent)
        
        # 确保新价格在合理范围内
        new_close = max(new_close, self.price_min * 0.9)
        new_close = min(new_close, self.price_max * 1.1)
        
        # 生成新高低
        noise = random.uniform(-0.01, 0.01) * new_close
        new_high = new_close * (1 + abs(noise))
        new_low = new_close * (1 - abs(noise))
        
        # 确保高低顺序正确
        new_high = max(new_high, new_low, new_close)
        new_low = min(new_high, new_low, new_close)
        
        # 生成成交量(随机波动)
        new_volume = max(100, int(last.volume * (1 + random.uniform(-0.2, 0.2))))
        
        self.add_data(KLineItem(
            timestamp=last.timestamp + 60,  # 假设每分钟一条数据
            open_price=last.close,
            close_price=new_close,
            high_price=new_high,
            low_price=new_low,
            volume=new_volume
        ))
        
    def update_price_range(self):
        """
        更新价格范围
        """
        if not self.kline_data:
            return
            
        # 计算价格范围(留出10%的边距)
        all_prices = []
        for item in self.kline_data:
            all_prices.extend([item.open, item.close, item.high, item.low])
            
        price_range = max(all_prices) - min(all_prices)
        self.price_min = min(all_prices) - price_range * 0.1
        self.price_max = max(all_prices) + price_range * 0.1
        
        # 计算成交量最大值
        self.volume_max = max(item.volume for item in self.kline_data) * 1.1
        
    def paintEvent(self, event):
        """
        绘制K线图(核心性能优化点)
        """
        # 1. 使用双缓冲减少闪烁
        painter = QPainter(self)
        painter.setRenderHint(QPainter.Antialiasing)
        
        # 2. 只重绘需要更新的区域
        draw_rect = event.rect()
        if self.cache_valid and self.last_draw_rect == draw_rect and self.cache_image:
            painter.drawImage(draw_rect, self.cache_image, draw_rect)
            return
            
        # 3. 创建缓存图像
        self.cache_image = QImage(self.size(), QImage.Format_ARGB32)
        cache_painter = QPainter(self.cache_image)
        cache_painter.setRenderHint(QPainter.Antialiasing)
        
        # 4. 绘制背景
        self.draw_background(cache_painter)
        
        # 5. 绘制坐标轴和网格
        self.draw_grid(cache_painter)
        
        # 6. 绘制K线(性能关键部分)
        self.draw_klines(cache_painter)
        
        # 7. 绘制成交量(可选)
        if self.show_volume:
            self.draw_volume(cache_painter)
            
        # 8. 完成缓存绘制
        cache_painter.end()
        
        # 9. 将缓存绘制到屏幕
        painter.drawImage(draw_rect, self.cache_image, draw_rect)
        
        # 10. 标记缓存有效
        self.cache_valid = True
        self.last_draw_rect = draw_rect
        
    def draw_background(self, painter):
        """
        绘制背景
        """
        if self.theme == "dark":
            painter.fillRect(self.rect(), QColor(30, 30, 30))
            painter.setPen(QPen(QColor(50, 50, 50), 1))
        else:
            painter.fillRect(self.rect(), Qt.white)
            painter.setPen(QPen(QColor(220, 220, 220), 1))
            
        # 绘制背景网格(稍后在draw_grid中处理)
        
    def draw_grid(self, painter):
        """
        绘制网格和坐标轴
        """
        width = self.width()
        height = self.height()
        
        if self.show_volume:
            chart_height = height * 0.7  # 主图占70%
            volume_height = height * 0.2  # 成交量占20%
        else:
            chart_height = height
            volume_height = 0
            
        # 绘制主图网格
        painter.save()
        if self.theme == "dark":
            painter.setPen(QPen(QColor(50, 50, 50), 1))
        else:
            painter.setPen(QPen(QColor(230, 230, 230), 1))
            
        # 水平网格线(价格)
        price_steps = 5
        price_step = (self.price_max - self.price_min) / price_steps
        for i in range(price_steps + 1):
            y = chart_height - (i * chart_height / price_steps)
            painter.drawLine(0, y, width, y)
            
            # 绘制价格标签
            price = self.price_min + i * price_step
            painter.drawText(5, y - 5, f"{price:.2f}")
            
        # 垂直网格线(时间) - 简化版,实际应根据数据量动态计算
        data_count = len(self.kline_data)
        if data_count > 1:
            time_steps = min(10, data_count // 2)  # 最多显示10个时间标签
            for i in range(time_steps + 1):
                x = self.margin + i * (width - 2 * self.margin) / time_steps
                painter.drawLine(x, 0, x, chart_height)
                
                # 绘制时间标签(简化版,实际应使用真实时间)
                if data_count > 0:
                    idx = int(i * (data_count - 1) / time_steps)
                    if idx < data_count:
                        painter.drawText(x - 15, chart_height + 15, str(idx))
        
        painter.restore()
        
        # 绘制成交量区域网格(如果显示成交量)
        if self.show_volume and self.volume_max > 0:
            painter.save()
            if self.theme == "dark":
                painter.setPen(QPen(QColor(50, 50, 50), 1))
            else:
                painter.setPen(QPen(QColor(230, 230, 230), 1))
                
            volume_y_start = chart_height + 5
            # 成交量网格线(简化版)
            for i in range(1, 3):
                y = volume_y_start + i * volume_height / 3
                painter.drawLine(0, y, width, y)
            
            painter.restore()
        
    def draw_klines(self, painter):
        """
        绘制K线(性能关键部分)
        """
        width = self.width()
        height = self.height()
        
        if self.show_volume:
            chart_height = height * 0.7
        else:
            chart_height = height
            
        data_count = len(self.kline_data)
        if data_count == 0:
            return
            
        # 计算每个K线的宽度和间距
        total_width = width - 2 * self.margin
        available_width = total_width - (data_count + 1) * 2  # 2像素间距
        if available_width <= 0:
            return
            
        self.candle_width = max(2, available_width / data_count)  # 最小宽度2像素
        
        # 准备当前显示的数据范围(性能优化:只绘制可见区域)
        # 这里简化处理,实际应根据视口范围动态计算
        start_idx = max(0, data_count - int(total_width / (self.candle_width + 2)))
        self.current_data = list(self.kline_data)[start_idx:]
        data_count = len(self.current_data)
        
        # 绘制K线
        painter.save()
        
        for i, item in enumerate(self.current_data):
            x = width - self.margin - (i + 0.5) * (self.candle_width + 2)
            
            # 计算价格坐标
            high_y = chart_height - (item.high - self.price_min) / (self.price_max - self.price_min) * chart_height
            low_y = chart_height - (item.low - self.price_min) / (self.price_max - self.price_min) * chart_height
            open_y = chart_height - (item.open - self.price_min) / (self.price_max - self.price_min) * chart_height
            close_y = chart_height - (item.close - self.price_min) / (self.price_max - self.price_min) * chart_height
            
            # 绘制上下影线
            if self.theme == "dark":
                pen = QPen(QColor(100, 100, 100), 1)
            else:
                pen = QPen(QColor(150, 150, 150), 1)
                
            painter.setPen(pen)
            painter.drawLine(x, high_y, x, low_y)
            
            # 绘制实体
            candle_height = abs(open_y - close_y)
            if candle_height < 1:  # 确保最小高度
                candle_height = 1
                
            candle_rect = QRectF(
                x - self.candle_width / 2,
                min(open_y, close_y),
                self.candle_width,
                candle_height
            )
            
            if item.is_rise():
                # 阳线(红色或绿色取决于主题)
                if self.theme == "dark":
                    painter.setBrush(QColor(200, 50, 50))
                else:
                    painter.setBrush(QColor(255, 50, 50))
                painter.setPen(QPen(QColor(200, 50, 50), 1))
            else:
                # 阴线
                if self.theme == "dark":
                    painter.setBrush(QColor(50, 150, 50))
                else:
                    painter.setBrush(QColor(50, 200, 50))
                painter.setPen(QPen(QColor(50, 150, 50), 1))
                
            painter.drawRect(candle_rect)
            
        painter.restore()
        
    def draw_volume(self, painter):
        """
        绘制成交量
        """
        if not self.current_data or self.volume_max <= 0:
            return
            
        width = self.width()
        height = self.height()
        chart_height = height * 0.7
        volume_height = height * 0.2
        volume_y_start = chart_height + 5
        
        painter.save()
        
        for i, item in enumerate(self.current_data):
            x = width - self.margin - (i + 0.5) * (self.candle_width + 2)
            
            # 计算成交量高度
            vol_height = (item.volume / self.volume_max) * volume_height
            if vol_height < 1:
                vol_height = 1
                
            vol_rect = QRectF(
                x - self.candle_width / 2,
                volume_y_start + volume_height - vol_height,
                self.candle_width,
                vol_height
            )
            
            # 根据涨跌设置不同颜色
            if item.is_rise():
                if self.theme == "dark":
                    painter.setBrush(QColor(200, 50, 50))
                else:
                    painter.setBrush(QColor(255, 100, 100))
            else:
                if self.theme == "dark":
                    painter.setBrush(QColor(50, 150, 50))
                else:
                    painter.setBrush(QColor(100, 200, 100))
                    
            painter.setPen(Qt.NoPen)
            painter.drawRect(vol_rect)
            
        painter.restore()
        
    def resizeEvent(self, event):
        """
        窗口大小变化时使缓存失效
        """
        self.cache_valid = False
        super().resizeEvent(event)
        
    def clear_data(self):
        """
        清除所有数据
        """
        self.kline_data.clear()
        self.current_data.clear()
        self.cache_valid = False
        self.update()


class KLineDemo(QMainWindow):
    """
    主窗口类
    """
    def __init__(self):
        super().__init__()
        self.setWindowTitle("PyQt5 K线图演示")
        self.setGeometry(100, 100, 1000, 600)
        
        # 创建UI
        self.init_ui()
        
    def init_ui(self):
        """
        初始化用户界面
        """
        # 主窗口部件
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        
        # 布局
        layout = QVBoxLayout()
        central_widget.setLayout(layout)
        
        # 标题
        title_label = QLabel("PyQt5 K线图演示")
        title_label.setAlignment(Qt.AlignCenter)
        title_label.setFont(QFont("Arial", 16))
        layout.addWidget(title_label)
        
        # K线图控件
        self.kline_widget = KLineWidget()
        layout.addWidget(self.kline_widget)
        
        # 控制面板
        control_panel = QWidget()
        control_layout = QVBoxLayout()
        control_panel.setLayout(control_layout)
        
        # 按钮区域
        button_layout = QHBoxLayout()
        
        # 开始按钮
        self.start_btn = QPushButton("开始模拟数据")
        self.start_btn.clicked.connect(self.start_simulation)
        button_layout.addWidget(self.start_btn)
        
        # 停止按钮
        self.stop_btn = QPushButton("停止")
        self.stop_btn.clicked.connect(self.stop_simulation)
        self.stop_btn.setEnabled(False)
        button_layout.addWidget(self.stop_btn)
        
        # 清除按钮
        self.clear_btn = QPushButton("清除数据")
        self.clear_btn.clicked.connect(self.clear_data)
        button_layout.addWidget(self.clear_btn)
        
        # 主题选择
        self.theme_combo = QComboBox()
        self.theme_combo.addItems(["暗色主题", "亮色主题"])
        self.theme_combo.currentTextChanged.connect(self.change_theme)
        button_layout.addWidget(self.theme_combo)
        
        # 成交量开关
        self.volume_check = QPushButton("隐藏成交量")
        self.volume_check.setCheckable(True)
        self.volume_check.setChecked(True)
        self.volume_check.clicked.connect(self.toggle_volume)
        button_layout.addWidget(self.volume_check)
        
        control_layout.addLayout(button_layout)
        layout.addWidget(control_panel)
        
    def start_simulation(self):
        """
        开始模拟数据
        """
        self.kline_widget.timer.start(500)  # 每500ms添加一条数据
        self.start_btn.setEnabled(False)
        self.stop_btn.setEnabled(True)
        
    def stop_simulation(self):
        """
        停止模拟数据
        """
        self.kline_widget.timer.stop()
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)
        
    def clear_data(self):
        """
        清除数据
        """
        self.kline_widget.clear_data()
        
    def change_theme(self, theme):
        """
        切换主题
        """
        self.kline_widget.theme = "dark" if theme == "暗色主题" else "light"
        self.kline_widget.cache_valid = False
        self.kline_widget.update()
        
    def toggle_volume(self, checked):
        """
        切换成交量显示
        """
        self.kline_widget.show_volume = checked
        self.volume_check.setText("隐藏成交量" if checked else "显示成交量")
        self.kline_widget.cache_valid = False
        self.kline_widget.update()


if __name__ == "__main__":
    app = QApplication(sys.argv)
    
    # 性能优化:设置应用程序样式
    app.setStyle("Fusion")
    
    window = KLineDemo()
    window.show()
    sys.exit(app.exec_())

代码说明与性能优化技术

1. 数据结构优化

使用双端队列


python

self.kline_data = deque(maxlen=200) # 限制最大数据量
  • 自动淘汰旧数据,防止内存无限增长
  • 比普通列表更高效的追加和弹出操作

K线数据项类


python

class KLineItem:
def __init__(self, timestamp, open_price, close_price, high_price, low_price, volume):
self.timestamp = timestamp
self.open = open_price
self.close = close_price
self.high = high_price
self.low = low_price
self.volume = volume
  • 结构化存储K线数据,便于管理和访问

2. 绘制性能优化

双缓冲技术


python

# 在paintEvent中
self.cache_image = QImage(self.size(), QImage.Format_ARGB32)
cache_painter = QPainter(self.cache_image)
# ...绘制到缓存...
painter.drawImage(draw_rect, self.cache_image, draw_rect)
  • 减少屏幕闪烁
  • 避免重复计算

局部重绘


python

draw_rect = event.rect()
if self.cache_valid and self.last_draw_rect == draw_rect and self.cache_image:
painter.drawImage(draw_rect, self.cache_image, draw_rect)
return
  • 只重绘需要更新的区域
  • 显著提高性能,特别是在窗口调整大小时

动态K线宽度计算


python

available_width = total_width - (data_count + 1) * 2 # 2像素间距
self.candle_width = max(2, available_width / data_count) # 最小宽度2像素
  • 根据显示区域自动调整K线宽度
  • 确保在不同窗口大小下都能正常显示

3. 数据可视化优化

价格范围动态计算


python

def update_price_range(self):
# 计算价格范围(留出10%的边距)
price_range = max(all_prices) - min(all_prices)
self.price_min = min(all_prices) - price_range * 0.1
self.price_max = max(all_prices) + price_range * 0.1
  • 自动适应数据范围
  • 留出适当边距使图表更美观

智能数据裁剪


python

start_idx = max(0, data_count - int(total_width / (self.candle_width + 2)))
self.current_data = list(self.kline_data)[start_idx:]
  • 只绘制当前视口可见的数据
  • 大幅减少绘制量,提高性能

4. 主题与样式优化

主题支持


python

self.theme = "dark" # 或 "light"
  • 提供暗色和亮色两种主题
  • 减少不必要的样式计算

条件渲染


python

if self.theme == "dark":
painter.setPen(QPen(QColor(50, 50, 50), 1))
else:
painter.setPen(QPen(QColor(230, 230, 230), 1))
  • 根据主题选择不同颜色
  • 避免运行时重复计算

5. 内存管理优化

缓存失效机制


python

self.cache_valid = False # 数据变化时标记缓存失效
  • 数据变化时自动使缓存失效
  • 确保显示内容始终最新

资源清理


python

def clear_data(self):
self.kline_data.clear()
self.current_data.clear()
self.cache_valid = False
self.update()
  • 提供明确的资源清理方法
  • 防止内存泄漏

扩展性能优化建议

  1. 使用OpenGL加速
    • 对于极大量数据,可以考虑使用QOpenGLWidget进行硬件加速绘制
    • 需要安装PyQt5的OpenGL支持
  2. 数据分页加载
    • 实现数据分页,只加载当前视口需要的数据
    • 特别适合历史数据回放场景
  3. WebWorker模式
    • 对于特别复杂的数据处理,可以考虑使用QThread在后台处理数据
    • 通过信号槽机制与主线程通信
  4. 简化复杂图形
    • 在极小的K线宽度下,可以简化影线绘制
    • 使用位图或预渲染技术优化极大量数据的显示
  5. 使用QCharts
    • PyQt5的QtCharts模块提供了 candlestick 图表类型
    • 适合快速实现基本K线图,但自定义程度较低

实际应用建议

  1. 真实数据接入
    • 替换add_random_data方法为真实数据源
    • 可以使用WebSocket接入实时行情数据
  2. 技术指标计算
    • 在数据添加时计算MA、MACD、KDJ等指标
    • 可以单独绘制指标曲线
  3. 交互功能增强
    • 添加十字光标功能
    • 实现缩放和平移操作
    • 添加K线选择和详细信息显示
  4. 多周期支持
    • 实现不同时间周期(1分钟、5分钟、日K等)的切换
    • 可以缓存不同周期的数据

这个实现提供了良好的基础框架和多项性能优化技术,可以根据实际需求进一步扩展和完善。