1、问题描述:
ollama调用千问2.5-vl视频图片内容,通常用命令行工具不方便,于是做了一个python UI界面与大家分享。需要提前安装ollama,并下载千问qwen2.5vl:7b 模型,在ollama官网即可下载。 (8G-6G 显卡可用),用于识别图片信息。之前还下载了 qwen3:8b版,发现也可以此程序调用,比图片识别更快,用qwen3:8b文字直接提问,随便输入张图片即可。图片不起作用。 不知为何qwen2.5vl:7b 默认只支持cpu预处理图片,所以,图片推理的过程非常慢。qwen3:8b 默认支持gpu,速度快100倍,反应迅速,机会秒回复。这就是gpu 与cpu,推理的天壤之别吧,哈哈。南无阿弥陀佛。
如下图:
使用方法:很简单,
2、图片推理:
在模型管理列表栏,选择相应的qwen2.5vl:7b模型,点击选择模型按钮,之后,直接在最下面,点击选择图片按钮,支持三张,太多图片推理太慢了。单张最快,cpu推理就是这样,之后,在提示词栏,输入要对图片做的推理要求,默认是描述图片内容,也可以问图片里的特殊的人事物,等等。也可以指定要求推理输出的文字字数,1000字以内没啥问题。
3、文字推理:
同理,在模型管理列表栏,选择相应的qwen3:8b模型,点击选择模型按钮,之后,直接在最下面,点击选择图片按钮,随便选张图,如果之前已经选了,就忽略此步。之后,在提示词栏,输入要提问的提示词即可,几千字以内似乎没啥问题。南无阿弥陀佛。
4、程序免费下载地址:
程序下载地址:https://download.csdn.net/download/tian0000hai/90856287
南无阿弥陀佛。
5、源码分享:ollama_qwen25vl_gui.py
import os
import sys
import base64
import json
import requests
import concurrent.futures
import numpy as np
import cv2 # 需要安装OpenCV库
# 确保打包后能找到证书
if getattr(sys, 'frozen', False):
# 如果是打包后的可执行文件
requests.utils.DEFAULT_CA_BUNDLE_PATH = os.path.join(sys._MEIPASS, 'certifi', 'cacert.pem')
import threading
import queue
import tkinter as tk
from tkinter import filedialog, ttk, messagebox, scrolledtext
from PIL import Image, ImageTk, PngImagePlugin
import subprocess
import platform
import time
class OllamaQwenGUI:
def __init__(self, root):
self.root = root
self.root.title("Ollama Qwen2.5-VL 图片识别工具 - 南无阿弥陀佛")
self.root.geometry("1000x750")
self.root.minsize(900, 900)
# 设置中文字体支持
self.font_family = "SimHei" if sys.platform.startswith("win") else "WenQuanYi Micro Hei"
# 创建Ollama API配置
self.api_url = "http://localhost:11434/api"
self.model_name = "qwen2.5vl:7b"
# API服务进程
self.api_process = None
self.console_output_queue = queue.Queue()
# 识别进度相关
self.is_recognizing = False
self.progress_queue = queue.Queue()
# 创建GUI组件
self.create_widgets()
# 创建线程间通信队列
self.queue = queue.Queue()
# 初始化模型列表和API状态
self.check_api_status()
self.refresh_model_list()
# 启动控制台输出监控线程
self.monitor_console_output = True
threading.Thread(target=self.read_console_output, daemon=True).start()
# 启动进度更新线程
threading.Thread(target=self.update_progress, daemon=True).start()
def create_widgets(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 创建顶部API配置区域
config_frame = ttk.LabelFrame(main_frame, text="Ollama API 配置", padding="10")
config_frame.pack(fill=tk.X, pady=5)
ttk.Label(config_frame, text="API 地址:", font=(self.font_family, 10)).grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
self.api_entry = ttk.Entry(config_frame, width=40, font=(self.font_family, 10))
self.api_entry.insert(0, self.api_url)
self.api_entry.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
ttk.Label(config_frame, text="模型名称:", font=(self.font_family, 10)).grid(row=0, column=2, sticky=tk.W, padx=5, pady=5)
self.model_entry = ttk.Entry(config_frame, width=20, font=(self.font_family, 10))
self.model_entry.insert(0, self.model_name)
self.model_entry.grid(row=0, column=3, sticky=tk.W, padx=5, pady=5)
# 创建API服务控制区域
api_frame = ttk.LabelFrame(main_frame, text="API 服务控制", padding="10")
api_frame.pack(fill=tk.X, pady=5)
self.api_status_var = tk.StringVar(value="API 状态: 未知")
self.api_status_label = ttk.Label(api_frame, textvariable=self.api_status_var, font=(self.font_family, 10))
self.api_status_label.grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
self.start_api_button = ttk.Button(api_frame, text="启动 API", command=self.start_api, width=12)
self.start_api_button.grid(row=0, column=1, padx=5, pady=5)
self.stop_api_button = ttk.Button(api_frame, text="停止 API", command=self.stop_api, width=12)
self.stop_api_button.grid(row=0, column=2, padx=5, pady=5)
self.check_api_button = ttk.Button(api_frame, text="检查状态", command=self.check_api_status, width=12)
self.check_api_button.grid(row=0, column=3, padx=5, pady=5)
# 创建模型管理区域
model_frame = ttk.LabelFrame(main_frame, text="模型管理 -(鼠标中键滚轮可查看更多内容)", padding="10")
model_frame.pack(fill=tk.X, pady=5)
self.model_listbox = tk.Listbox(model_frame, width=40, height=5, font=(self.font_family, 10))
self.model_listbox.grid(row=0, column=0, rowspan=2, sticky=tk.W+tk.E, padx=5, pady=5)
# 添加选择按钮
self.select_button = ttk.Button(model_frame, text="选择模型", command=self.select_model, width=12)
self.select_button.grid(row=1, column=1, padx=5, pady=5)
self.download_button = ttk.Button(model_frame, text="下载模型", command=self.download_model, width=12)
self.download_button.grid(row=0, column=1, padx=5, pady=5)
self.start_button = ttk.Button(model_frame, text="启动模型", command=self.start_model, width=12)
self.start_button.grid(row=0, column=2, padx=5, pady=5)
self.stop_button = ttk.Button(model_frame, text="停止模型", command=self.stop_model, width=12)
self.stop_button.grid(row=0, column=3, padx=5, pady=5)
self.refresh_button = ttk.Button(model_frame, text="刷新列表", command=self.refresh_model_list, width=12)
self.refresh_button.grid(row=1, column=2, padx=5, pady=5)
# 添加删除模型按钮
self.delete_button = ttk.Button(model_frame, text="删除模型", command=self.delete_model, width=12)
self.delete_button.grid(row=1, column=3, padx=5, pady=5)
self.model_status_var = tk.StringVar(value="模型状态: 未知")
self.model_status_label = ttk.Label(model_frame, textvariable=self.model_status_var, font=(self.font_family, 10))
self.model_status_label.grid(row=1, column=4, sticky=tk.W, padx=5, pady=5)
# 创建中间图片和结果区域
middle_frame = ttk.Frame(main_frame, padding="5")
middle_frame.pack(fill=tk.BOTH, expand=True)
# 左侧图片区域
image_frame = ttk.LabelFrame(middle_frame, text="图片预览", padding="10")
image_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
self.image_container = ttk.Frame(image_frame)
self.image_container.pack(fill=tk.BOTH, expand=True)
self.image_label = ttk.Label(self.image_container, text="请选择一张或多张图片(建议:不超超过三张)", font=(self.font_family, 12))
self.image_label.pack(expand=True)
# 右侧结果区域
result_frame = ttk.LabelFrame(middle_frame, text="识别结果", padding="10")
result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5)
# 创建结果和控制台输出的标签页
self.result_notebook = ttk.Notebook(result_frame)
self.result_notebook.pack(fill=tk.BOTH, expand=True)
# 识别结果标签页
self.result_tab = ttk.Frame(self.result_notebook)
self.result_notebook.add(self.result_tab, text="识别结果")
# 创建进度指示器框架
self.progress_frame = ttk.Frame(self.result_tab)
self.progress_frame.pack(fill=tk.X, pady=5)
self.progress_var = tk.StringVar(value="准备就绪")
self.progress_label = ttk.Label(self.progress_frame, textvariable=self.progress_var, font=(self.font_family, 10))
self.progress_label.pack(side=tk.LEFT, padx=5)
self.progress_bar = ttk.Progressbar(self.progress_frame, orient=tk.HORIZONTAL, length=100, mode='indeterminate')
self.progress_bar.pack(side=tk.LEFT, padx=5)
self.result_text = scrolledtext.ScrolledText(self.result_tab, wrap=tk.WORD, font=(self.font_family, 11), height=18)
self.result_text.pack(fill=tk.BOTH, expand=True)
# 控制台输出标签页
self.console_tab = ttk.Frame(self.result_notebook)
self.result_notebook.add(self.console_tab, text="控制台输出")
self.console_text = scrolledtext.ScrolledText(self.console_tab, wrap=tk.WORD, font=(self.font_family, 10), height=20)
self.console_text.pack(fill=tk.BOTH, expand=True)
# 创建底部控制区域
control_frame = ttk.Frame(main_frame, padding="10")
control_frame.pack(fill=tk.X, pady=5)
ttk.Label(control_frame, text="提示词:", font=(self.font_family, 10)).grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
self.prompt_var = tk.StringVar(value="描述这些图片的内容")
self.prompt_entry = ttk.Entry(control_frame, textvariable=self.prompt_var, width=60, font=(self.font_family, 10))
self.prompt_entry.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
self.browse_button = ttk.Button(control_frame, text="选择图片", command=self.browse_image, width=12)
self.browse_button.grid(row=0, column=2, padx=5, pady=5)
self.recognize_button = ttk.Button(control_frame, text="开始识别", command=self.start_recognition, width=12)
self.recognize_button.grid(row=0, column=3, padx=5, pady=5)
# 创建状态栏
self.status_var = tk.StringVar(value="就绪")
self.status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def browse_image(self):
file_paths = filedialog.askopenfilenames(
title="选择图片",
filetypes=[("选择图片", "*.jpg;*.jpeg;*.png;*.bmp;*.webp;*.tiff;*.tif;*.gif")]
)
if file_paths:
self.image_paths = file_paths
self.display_images(file_paths)
self.status_var.set(f"已选择 {len(file_paths)} 张图片")
def display_images(self, file_paths):
# 清空之前的图片显示
for widget in self.image_container.winfo_children():
widget.destroy()
try:
max_width = 200
max_height = 200
for file_path in file_paths:
# 增加PNG文本块的最大大小限制,防止iCCP警告
PngImagePlugin.MAX_TEXT_CHUNK = 100 * (1024**2) # 100MB
image = Image.open(file_path)
# 检查图片格式,如果是PNG则尝试处理iCCP警告
if image.format == 'PNG':
# 尝试移除iCCP chunk
try:
image = image.convert('RGB')
except Exception as e:
print(f"处理PNG图片时出错: {e}")
# 计算调整后的尺寸
width, height = image.size
if width > max_width or height > max_height:
ratio = min(max_width/width, max_height/height)
width = int(width * ratio)
height = int(height * ratio)
image = image.resize((width, height), Image.LANCZOS)
# 显示图片
photo = ImageTk.PhotoImage(image)
label = ttk.Label(self.image_container, image=photo)
label.image = photo
label.pack(side=tk.LEFT, padx=5, pady=5)
except Exception as e:
self.image_label.config(text=f"无法显示图片: {str(e)}")
print(f"加载图片时出错: {e}")
def start_recognition(self):
if not hasattr(self, 'image_paths'):
messagebox.showwarning("警告", "请先选择一张或多张图片")
return
# 检查API是否运行
if not self.check_api_status(show_message=False):
messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
return
# 清空结果区域
self.result_text.delete(1.0, tk.END)
# 更新状态
self.is_recognizing = True
self.progress_var.set("准备识别...")
self.progress_bar.start()
# 更新状态栏并禁用按钮
self.status_var.set("正在识别...")
self.browse_button.config(state=tk.DISABLED)
self.recognize_button.config(state=tk.DISABLED)
# 在新线程中执行识别任务
threading.Thread(target=self.perform_recognition, daemon=True).start()
def encode_image_to_base64(self, image_path):
"""使用OpenCV和NumPy优化Base64编码"""
try:
img = cv2.imread(image_path)
if img is None:
# 回退到标准方法
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
# 检查图像编码是否成功
success, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80])
if success:
return base64.b64encode(buffer).decode('utf-8')
else:
raise Exception("图像编码失败")
except Exception as e:
print(f"使用OpenCV编码失败: {e}")
# 出错时回退到标准方法
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
def perform_recognition(self):
try:
# 优化1: 使用线程池并行处理多个图像的Base64编码
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_path = {executor.submit(self.encode_image_to_base64, path): path for path in self.image_paths}
base64_images = []
for future in concurrent.futures.as_completed(future_to_path):
try:
base64_images.append(future.result())
except Exception as e:
print(f"编码图像时出错: {e}")
# 准备API请求数据
api_url = f"{self.api_entry.get().strip()}/generate"
model_name = self.model_entry.get().strip()
prompt = self.prompt_var.get().strip()
# 发送进度更新
self.progress_queue.put("正在初始化模型...")
payload = {
"model": model_name,
"prompt": prompt,
"stream": True, # 启用流式输出
"images": base64_images
}
# 发送请求到Ollama API
self.progress_queue.put("正在发送请求...")
response = requests.post(api_url, json=payload, stream=True)
if response.status_code == 200:
self.progress_queue.put("开始接收识别结果...")
self.result_text.insert(tk.END, "识别结果:\n\n")
# 处理流式响应
full_response = ""
for line in response.iter_lines():
if line:
# 解析JSON行
try:
chunk = json.loads(line)
if 'response' in chunk:
text_chunk = chunk['response']
full_response += text_chunk
self.result_text.insert(tk.END, text_chunk)
self.result_text.see(tk.END) # 滚动到底部
except json.JSONDecodeError as e:
self.progress_queue.put(f"解析响应错误: {str(e)}")
self.progress_queue.put("识别完成")
self.queue.put(("success", "识别完成"))
else:
self.queue.put(("error", f"API请求失败: HTTP {response.status_code} - {response.text}"))
except Exception as e:
self.queue.put(("error", f"发生错误: {str(e)}"))
finally:
# 无论成功或失败,都要恢复UI状态
self.is_recognizing = False
self.progress_bar.stop()
self.queue.put(("update_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def download_model(self):
model_name = self.model_entry.get().strip()
if not model_name:
messagebox.showwarning("警告", "请输入要下载的模型名称")
return
# 检查API是否运行
if not self.check_api_status(show_message=False):
messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
return
# 更新状态栏并禁用按钮
self.status_var.set(f"正在下载模型: {model_name}")
self.download_button.config(state=tk.DISABLED)
# 在新线程中执行下载任务
threading.Thread(target=self.perform_model_download, args=(model_name,), daemon=True).start()
def perform_model_download(self, model_name):
try:
api_url = f"{self.api_entry.get().strip()}/pull"
payload = {
"name": model_name,
"stream": True # 启用流式输出获取进度
}
# 发送请求到Ollama API
response = requests.post(api_url, json=payload, stream=True)
if response.status_code == 200:
self.queue.put(("success", f"开始下载模型: {model_name}"))
# 处理流式响应获取进度
for line in response.iter_lines():
if line:
try:
chunk = json.loads(line)
if 'status' in chunk:
status = chunk['status']
self.queue.put(("success", f"下载状态: {status}"))
if 'completed' in chunk and 'total' in chunk:
progress = chunk['completed'] / chunk['total'] * 100
self.queue.put(("success", f"下载进度: {progress:.1f}%"))
except json.JSONDecodeError:
pass
self.queue.put(("success", f"模型 {model_name} 下载成功"))
self.refresh_model_list()
else:
self.queue.put(("error", f"模型下载失败: HTTP {response.status_code} - {response.text}"))
except Exception as e:
self.queue.put(("error", f"发生错误: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_download_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def start_model(self):
model_name = self.model_entry.get().strip()
if not model_name:
messagebox.showwarning("警告", "请选择要启动的模型")
return
# 检查API是否运行
if not self.check_api_status(show_message=False):
messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
return
# 更新状态栏并禁用按钮
self.status_var.set(f"正在启动模型: {model_name}")
self.start_button.config(state=tk.DISABLED)
# 在新线程中执行启动任务
threading.Thread(target=self.perform_model_start, args=(model_name,), daemon=True).start()
def perform_model_start(self, model_name):
try:
# 简单模拟启动模型 - Ollama通常在调用时自动启动模型
# 实际可能需要调用特定API
self.queue.put(("success", f"正在加载模型: {model_name}"))
# 发送一个简单请求来触发模型加载
api_url = f"{self.api_entry.get().strip()}/generate"
payload = {
"model": model_name,
"prompt": "加载模型",
"stream": False
}
response = requests.post(api_url, json=payload)
if response.status_code == 200:
self.queue.put(("success", f"模型 {model_name} 已启动"))
else:
self.queue.put(("error", f"启动模型失败: HTTP {response.status_code}"))
self.update_model_status(model_name)
except Exception as e:
self.queue.put(("error", f"启动模型失败: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_start_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def stop_model(self):
model_name = self.model_entry.get().strip()
if not model_name:
messagebox.showwarning("警告", "请选择要停止的模型")
return
# 检查API是否运行
if not self.check_api_status(show_message=False):
messagebox.showwarning("警告", "API服务未启动,模型可能已停止")
return
# 更新状态栏并禁用按钮
self.status_var.set(f"正在停止模型: {model_name} 并清空显存...")
self.stop_button.config(state=tk.DISABLED)
# 在新线程中执行停止任务
threading.Thread(target=self.perform_model_stop, args=(model_name,), daemon=True).start()
def perform_model_stop(self, model_name):
try:
# 执行停止模型程序
stop_model_proc = "ollama stop " + model_name
return_code = os.system(stop_model_proc)
if return_code == 0:
self.queue.put(("success", f"模型 {model_name} 已停止"))
self.refresh_model_list()
else:
self.queue.put(("error", f"模型停止失败: HTTP {response.status_code} - {response.text}"))
except Exception as e:
self.queue.put(("error", f"停止模型失败: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_stop_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def refresh_model_list(self):
# 清空列表
self.model_listbox.delete(0, tk.END)
# 检查API是否运行
if not self.check_api_status(show_message=False):
self.queue.put(("error", "无法获取模型列表:API服务未启动"))
return
# 在新线程中获取模型列表
threading.Thread(target=self.fetch_model_list, daemon=True).start()
def fetch_model_list(self):
try:
api_url = f"{self.api_entry.get().strip()}/tags"
# 发送请求到Ollama API
response = requests.get(api_url)
if response.status_code == 200:
models = response.json().get("models", [])
model_names = [model.get("name", "") for model in models]
# 将模型名称添加到队列中更新UI
self.queue.put(("update_model_list", model_names))
# 更新当前模型状态
current_model = self.model_entry.get().strip()
self.update_model_status(current_model)
else:
self.queue.put(("error", f"获取模型列表失败: HTTP {response.status_code} - {response.text}"))
except Exception as e:
self.queue.put(("error", f"发生错误: {str(e)}"))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def update_model_status(self, model_name):
# 简单实现,实际应通过API查询模型状态
# 这里假设如果模型在列表中,则认为是可用状态
model_names = [self.model_listbox.get(i) for i in range(self.model_listbox.size())]
status = "已安装" if model_name in model_names else "未安装"
self.model_status_var.set(f"模型状态: {status}")
def select_model(self):
"""从列表中选择模型并填充到模型名称输入框"""
selected_indices = self.model_listbox.curselection()
if not selected_indices:
messagebox.showwarning("警告", "请先从列表中选择一个模型")
return
model_name = self.model_listbox.get(selected_indices[0])
self.model_entry.delete(0, tk.END)
self.model_entry.insert(0, model_name)
self.status_var.set(f"已选择模型: {model_name}")
self.update_model_status(model_name)
def start_api(self):
if self.api_process is not None and self.api_process.poll() is None:
messagebox.showwarning("警告", "API服务已在运行中")
return
# 清空控制台输出
self.console_text.delete(1.0, tk.END)
# 更新状态栏并禁用按钮
self.status_var.set("正在启动API服务...")
self.start_api_button.config(state=tk.DISABLED)
# 在新线程中启动API
threading.Thread(target=self.perform_api_start, daemon=True).start()
def perform_api_start(self):
try:
#os.system("set OLLAMA_CUDA=1 # Windows cmd")
# 设置CUDA环境变量
os.environ["OLLAMA_CUDA"] = "1" # 明确启用CUDA
# 获取API地址和端口
api_full_url = self.api_entry.get().strip()
base_url = api_full_url.rsplit('/', 1)[0]
host, port = base_url.replace('http://', '').split(':')
# 根据操作系统执行不同的命令
if platform.system() == "Windows":
self.api_process = subprocess.Popen(
f"ollama serve -h {host} -p {port}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
else:
self.api_process = subprocess.Popen(
f"ollama serve -h {host} -p {port}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
text=True,
bufsize=1
)
# 等待服务启动(尝试连接)
max_attempts = 10
for attempt in range(max_attempts):
try:
response = requests.get(f"{api_full_url}/tags", timeout=2)
if response.status_code == 200:
self.queue.put(("success", "API服务已启动"))
self.check_api_status()
return
except:
time.sleep(1)
self.queue.put(("error", "API服务启动超时"))
except Exception as e:
self.queue.put(("error", f"启动API服务失败: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_api_start_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def stop_api(self):
if self.api_process is None or self.api_process.poll() is not None:
messagebox.showwarning("警告", "API服务未在运行中")
self.api_process = None
self.api_status_var.set("API 状态: 未运行")
return
# 更新状态栏并禁用按钮
self.status_var.set("正在停止API服务...")
self.stop_api_button.config(state=tk.DISABLED)
# 在新线程中停止API
threading.Thread(target=self.perform_api_stop, daemon=True).start()
def perform_api_stop(self):
try:
if self.api_process is not None:
# 终止进程
if platform.system() == "Windows":
self.api_process.terminate()
else:
import os
import signal
os.killpg(os.getpgid(self.api_process.pid), signal.SIGTERM)
# 等待进程结束
try:
self.api_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.api_process.kill()
self.api_process.wait()
self.api_process = None
self.queue.put(("success", "API服务已停止"))
self.api_status_var.set("API 状态: 未运行")
except Exception as e:
self.queue.put(("error", f"停止API服务失败: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_api_stop_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
def check_api_status(self, show_message=True):
try:
api_url = f"{self.api_entry.get().strip()}/tags"
response = requests.get(api_url, timeout=2)
if response.status_code == 200:
self.api_status_var.set("API 状态: 运行中")
if show_message:
self.queue.put(("success", "API服务正在运行"))
return True
else:
self.api_status_var.set("API 状态: 未运行")
if show_message:
self.queue.put(("error", f"API服务未响应: HTTP {response.status_code}"))
return False
except Exception as e:
self.api_status_var.set("API 状态: 未运行")
if show_message:
self.queue.put(("error", f"无法连接到API服务: {str(e)}"))
return False
def read_console_output(self):
"""读取API进程的控制台输出并添加到队列中"""
while self.monitor_console_output:
if self.api_process is not None and self.api_process.poll() is None:
# 读取标准输出
if self.api_process.stdout:
for line in iter(self.api_process.stdout.readline, ''):
if line:
self.console_output_queue.put(line.strip())
# 读取错误输出
if self.api_process.stderr:
for line in iter(self.api_process.stderr.readline, ''):
if line:
self.console_output_queue.put(f"[错误] {line.strip()}")
# 检查队列并更新UI
self.root.after(100, self.update_console_text)
time.sleep(0.1)
def update_console_text(self):
"""从队列中获取控制台输出并更新UI"""
while not self.console_output_queue.empty():
try:
line = self.console_output_queue.get()
self.console_text.insert(tk.END, line + "\n")
self.console_text.see(tk.END) # 滚动到底部
except queue.Empty:
pass
def update_progress(self):
"""更新识别进度"""
while True:
if not self.progress_queue.empty():
try:
progress_msg = self.progress_queue.get()
self.progress_var.set(progress_msg)
except queue.Empty:
pass
time.sleep(0.1)
def check_queue(self):
while not self.queue.empty():
try:
msg = self.queue.get()
if msg[0] == "success":
self.result_text.insert(tk.END, msg[1] + "\n")
self.status_var.set(msg[1])
elif msg[0] == "error":
self.result_text.insert(tk.END, f"错误: {msg[1]}\n")
self.status_var.set(f"错误: {msg[1]}")
elif msg[0] == "update_ui":
self.browse_button.config(state=tk.NORMAL)
self.recognize_button.config(state=tk.NORMAL)
elif msg[0] == "update_download_ui":
self.download_button.config(state=tk.NORMAL)
elif msg[0] == "update_start_ui":
self.start_button.config(state=tk.NORMAL)
elif msg[0] == "update_stop_ui":
self.stop_button.config(state=tk.NORMAL)
elif msg[0] == "update_model_list":
for model in msg[1]:
self.model_listbox.insert(tk.END, model)
elif msg[0] == "update_api_start_ui":
self.start_api_button.config(state=tk.NORMAL)
elif msg[0] == "update_api_stop_ui":
self.stop_api_button.config(state=tk.NORMAL)
elif msg[0] == "update_delete_ui":
self.delete_button.config(state=tk.NORMAL)
except queue.Empty:
pass
# 继续检查队列
self.root.after(100, self.check_queue)
def delete_model(self):
model_name = self.model_entry.get().strip()
if not model_name:
messagebox.showwarning("警告", "请输入要删除的模型名称")
return
# 检查API是否运行
if not self.check_api_status(show_message=False):
messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
return
# 更新状态栏并禁用按钮
self.status_var.set(f"正在删除模型: {model_name}")
self.delete_button.config(state=tk.DISABLED)
# 在新线程中执行删除任务
threading.Thread(target=self.perform_model_delete, args=(model_name,), daemon=True).start()
def perform_model_delete(self, model_name):
try:
# 执行删除模型程序
delete_model_proc = "ollama rm " + model_name
return_code = os.system(delete_model_proc)
if return_code == 0:
self.queue.put(("success", f"模型 {model_name} 已删除"))
self.refresh_model_list()
else:
self.queue.put(("error", f"模型删除失败: HTTP {response.status_code} - {response.text}"))
except Exception as e:
self.queue.put(("error", f"发生错误: {str(e)}"))
finally:
# 恢复UI状态
self.queue.put(("update_delete_ui",))
# 安排下一次队列检查
self.root.after(100, self.check_queue)
if __name__ == "__main__":
root = tk.Tk()
app = OllamaQwenGUI(root)
# 窗口关闭时确保API进程被终止
def on_closing():
app.monitor_console_output = False
if app.api_process is not None and app.api_process.poll() is None:
app.perform_api_stop()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
南无阿弥陀佛,哈哈。