《AI大模型趣味实战 》第7集:多端适配 个人新闻头条 基于大模型和RSS聚合打造个人新闻电台(Flask WEB版) 1

发布于:2025-03-26 ⋅ 阅读:(24) ⋅ 点赞:(0)

AI大模型趣味实战 第7集:多端适配 个人新闻头条 基于大模型和RSS聚合打造个人新闻电台(Flask WEB版) 1

摘要

在信息爆炸的时代,如何高效获取和筛选感兴趣的新闻内容成为一个现实问题。本文将带领读者通过Python和Flask框架,结合大模型的强大能力,构建一个个性化的新闻聚合平台,不仅能够自动收集整理各类RSS源的新闻,还能以语音播报的形式提供"新闻电台"功能。我们将重点探讨如何利用AI大模型优化新闻内容提取、自动生成标签分类,以及如何通过语音合成技术实现新闻播报功能,打造一个真正实用的个人新闻助手。

项目代码仓库:https://github.com/wyg5208/rss_news_flask

在这里插入图片描述

系统运行截图如下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

核心概念和知识点

1. RSS技术与信息聚合

RSS(Really Simple Syndication)是一种用于发布频繁更新的网站内容的XML格式,它允许用户订阅网站的更新内容。我们的项目利用RSS技术,从多个新闻源自动获取最新内容,无需手动访问各个网站。

主要涉及知识点:

  • RSS格式解析与内容提取
  • Web爬虫技术与内容清洗
  • 增量式数据更新策略

2. Web应用开发与交互设计

采用Flask框架构建Web应用,实现用户友好的界面和交互体验。

主要涉及知识点:

  • Flask应用结构设计
  • 前后端交互与API设计
  • 用户认证与会话管理
  • 响应式界面设计

3. 大模型应用

项目中大模型的应用主要体现在两个方面:

  • 新闻内容优化:使用大模型智能提取文章核心内容,去除广告等干扰元素
  • 自动标签生成:分析文章内容,自动提取关键词作为标签

主要涉及知识点:

  • 大模型API调用方法
  • Prompt设计与优化
  • 文本分析与关键信息提取

4. 语音合成技术

将文本转换为语音,实现新闻播报功能。

主要涉及知识点:

  • 文本到语音(TTS)技术
  • 音频文件处理与管理
  • 浏览器语音API集成

5. 系统设计与优化

包括数据库设计、任务调度系统、资源管理等方面。

主要涉及知识点:

  • SQLite数据库设计与优化
  • 多线程任务处理
  • 定时任务调度系统
  • 日志系统设计与管理

实战案例

接下来,我们将通过详细的代码示例和实现步骤,展示如何从零开始构建这个新闻聚合平台。

1. 项目初始化与环境配置

首先,我们需要创建项目目录结构并安装必要的依赖包。

# 项目依赖
# requirements.txt
alembic==1.15.1
aniso8601==10.0.0
anyio==4.9.0
attrs==25.3.0
beautifulsoup4==4.13.3
blinker==1.9.0
bs4==0.0.2
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
colorama==0.4.6
comtypes==1.4.10
feedparser==6.0.10
Flask==2.3.3
Flask-Login==0.6.3
Flask-Migrate==4.0.5
Flask-RESTful==0.3.10
Flask-SQLAlchemy==3.1.1
Flask-WTF==1.2.1
greenlet==3.1.1
gunicorn==21.2.0
h11==0.14.0
httpcore==1.0.7
httpx==0.25.2
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.2
lxml==4.9.3
Mako==1.3.9
MarkupSafe==3.0.2
ollama==0.1.5
outcome==1.3.0.post0
packaging==24.2
pycparser==2.22
pypiwin32==223
PySocks==1.7.1
python-dotenv==1.0.1
pyttsx3==2.98
pytz==2025.1
pywin32==310
requests==2.31.0
schedule==1.2.1
selenium==4.15.2
sgmllib3k==1.0.0
six==1.17.0
sniffio==1.3.1
sortedcontainers==2.4.0
soupsieve==2.6
SQLAlchemy==2.0.39
trio==0.29.0
trio-websocket==0.12.2
typing_extensions==4.12.2
urllib3==2.3.0
webdriver-manager==4.0.1
Werkzeug==2.3.7
wsproto==1.2.0
WTForms==3.2.1

使用如下命令安装依赖:

pip install -r requirements.txt

2. Flask应用框架搭建

创建基础的Flask应用结构:

# app.py (基础结构)
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///rss_news.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# 配置登录管理器
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'

# 数据库模型定义
class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(100), unique=True)
    password = db.Column(db.String(100))
    
    def __repr__(self):
        return f'<User {self.username}>'

# 路由定义
@app.route('/')
def index():
    return render_template('index.html')

# 应用入口
if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

3. 数据库模型设计

我们需要设计完整的数据库模型来存储新闻和相关信息:

# 数据库模型定义
class News(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(500))
    link = db.Column(db.String(500), unique=True, index=True)
    description = db.Column(db.Text)
    content = db.Column(db.Text)
    source = db.Column(db.String(100))
    pub_date = db.Column(db.DateTime)
    add_date = db.Column(db.DateTime, default=datetime.datetime.now)

class Tag(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100))
    news_id = db.Column(db.Integer, db.ForeignKey('news.id'))
    news = db.relationship('News', backref=db.backref('tags', lazy=True))

class TagLibrary(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), unique=True)
    category = db.Column(db.String(50))
    frequency = db.Column(db.Integer, default=0)

class ScheduledTask(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    task_id = db.Column(db.String(100), unique=True)
    task_type = db.Column(db.String(20))  # fetch or broadcast
    schedule_type = db.Column(db.String(20))  # daily, weekly, monthly
    value = db.Column(db.Integer)  # day number for weekly/monthly
    time_value = db.Column(db.String(10))  # HH:MM
    extra_params = db.Column(db.String(100))  # JSON string for additional params

4. RSS内容抓取与处理

RSS内容抓取是整个系统的核心功能之一:

def fetch_rss_task(use_selenium=False, use_llm=True):
    logger.info("开始执行RSS抓取任务...")
    
    # 创建一个应用上下文对象
    app_ctx = app.app_context()
    # 推送上下文
    app_ctx.push()
    
    try:
        # 获取RSS源
        with open('rss_list.txt', 'r', encoding='utf-8') as f:
            rss_urls = [line.strip() for line in f.readlines() if line.strip()]
        
        logger.info(f"读取到{len(rss_urls)}个RSS源")
        
        if not rss_urls:
            logger.warning("RSS列表为空,没有要抓取的源")
            return
        
        total_fetched = 0
        newly_added = 0
        
        # 初始化WebDriver(如果需要)
        driver = None
        if use_selenium:
            try:
                driver = WebDriverManager.get_instance().get_driver()
            except Exception as e:
                logger.error(f"初始化WebDriver时出错: {e}")
                use_selenium = False
        
        try:
            for url in rss_urls:
                try:
                    logger.info(f"开始处理RSS源: {url}")
                    # 解析RSS Feed
                    feed = feedparser.parse(url)
                    if not feed.entries:
                        logger.warning(f"{url} 没有条目")
                        continue
                    
                    source = feed.feed.title if hasattr(feed.feed, 'title') else url
                    
                    for entry in feed.entries:
                        title = entry.title if hasattr(entry, 'title') else "无标题"
                        link = entry.link if hasattr(entry, 'link') else ""
                        description = entry.description if hasattr(entry, 'description') else ""
                        
                        # 清理描述中的HTML标签
                        clean_description = ""
                        if description:
                            soup = BeautifulSoup(description, 'html.parser')
                            clean_description = soup.get_text(separator=' ', strip=True)
                        
                        if not link:
                            logger.warning("跳过无链接的条目")
                            continue
                        
                        # 检查链接是否已存在
                        existing_news = News.query.filter_by(link=link).first()
                        if existing_news:
                            logger.warning(f"跳过已存在的新闻: {title}")
                            total_fetched += 1
                            continue
                        
                        # 获取正文内容
                        content = ""
                        if use_selenium:
                            try:
                                content = extract_content_with_selenium(link, driver)
                            except Exception as e:
                                logger.error(f"使用Selenium提取内容时出错: {e}")
                                content = extract_content(link)
                        else:
                            content = extract_content(link)
                        
                        # 使用大模型优化内容(如果启用)
                        if use_llm and content:
                            try:
                                content = optimize_content_with_llm(content)
                            except Exception as e:
                                logger.error(f"使用大模型优化内容时出错: {e}")
                        
                        # 创建新闻条目
                        news = News(
                            title=title,
                            link=link,
                            description=clean_description,
                            content=content,
                            source=source,
                            pub_date=pub_date
                        )
                        
                        db.session.add(news)
                        db.session.commit()
                        
                        # 生成并保存标签
                        if content:
                            generate_tags_for_news(news)
                        
                        total_fetched += 1
                        newly_added += 1
                        logger.info(f"成功添加新闻: {title}")
                        
                except Exception as e:
                    logger.error(f"处理RSS源 {url} 时出错: {e}")
                    continue
        finally:
            # 确保资源被释放
            if use_selenium:
                logger.info("抓取任务完成,资源将在应用上下文关闭时释放")
    finally:
        # 弹出上下文 - 确保在所有情况下都释放上下文
        app_ctx.pop()

5. 大模型内容优化

使用大模型进行内容优化,提取核心新闻内容:

def optimize_content_with_llm(content):
    """使用大模型优化内容"""
    try:
        prompt = f"""
你是一个智能的内容提取助手。请从以下HTML内容中提取出真正的新闻文章内容,
移除所有广告、导航、页脚、侧边栏等无关内容。
保留原始的段落结构,返回整洁的HTML格式。
只返回正文内容,不要添加任何解释。

内容:
{content[:10000]}  # 限制输入长度
"""
        # 调用Ollama API
        response = ollama.chat(model='glm4', messages=[
            {
                'role': 'user',
                'content': prompt
            }
        ])
        
        extracted_content = response['message']['content']
        
        # 确保返回的是HTML格式
        if not extracted_content.strip().startswith('<'):
            extracted_content = f"<p>{extracted_content}</p>"
        
        return extracted_content
        
    except Exception as e:
        logger.error(f"使用大模型优化内容时出错: {e}")
        return content  # 出错时返回原始内容

6. 自动标签生成

使用大模型自动为新闻生成标签:

def generate_tags_for_news(news):
    """为新闻生成标签"""
    try:
        # 使用大模型生成标签
        prompt = f"""
分析以下新闻文章,提取5个关键词作为标签。
标签应该是单个词或短语,不超过10个字符,用逗号分隔。
只返回标签列表,不要添加任何解释。

标题: {news.title}
描述: {news.description or ""}
内容: {news.content[:5000] if news.content else ""}
"""
        
        try:
            # 调用Ollama API
            response = ollama.chat(model='glm4', messages=[
                {
                    'role': 'user',
                    'content': prompt
                }
            ])
            
            tags_text = response['message']['content']
            
            # 解析返回的标签
            tags = [tag.strip() for tag in re.split(r'[,,、]', tags_text) if tag.strip()]
            
            # 过滤长度超过10个字符的标签
            tags = [tag for tag in tags if len(tag) <= 10]
            
            # 最多保留5个标签
            tags = tags[:5]
            
        except Exception as e:
            logger.error(f"使用大模型生成标签时出错: {e}")
            # 如果大模型失败,尝试使用简单的关键词提取
            words = re.findall(r'\b\w{3,15}\b', news.title + " " + (news.description or ""))
            word_count = {}
            for word in words:
                if word.lower() not in ['the', 'and', 'for', 'with', 'that', 'this']:
                    word_count[word] = word_count.get(word, 0) + 1
            
            tags = [word for word, count in sorted(word_count.items(), key=lambda x: x[1], reverse=True) if len(word) <= 10][:5]
        
        # 保存标签
        for tag_name in tags:
            # 检查标签库是否有该标签
            tag_in_library = TagLibrary.query.filter_by(name=tag_name).first()
            
            if not tag_in_library:
                # 创建新标签库条目
                tag_in_library = TagLibrary(name=tag_name, frequency=1)
                db.session.add(tag_in_library)
            else:
                # 更新使用频率
                tag_in_library.frequency += 1
            
            # 创建新标签关联
            tag = Tag(name=tag_name, news_id=news.id)
            db.session.add(tag)
        
        db.session.commit()
        
    except Exception as e:
        logger.error(f"生成标签时出错: {e}")
        db.session.rollback()

7. 语音合成与新闻播报

实现新闻语音播报功能:

@app.route('/api/text_to_speech', methods=['POST'])
@login_required
def text_to_speech():
    """将文本转换为语音文件并返回URL"""
    try:
        # 获取请求数据
        data = request.get_json()
        if not data or 'text' not in data:
            return jsonify({'status': 'error', 'message': '缺少文本参数'})
        
        text = data['text']
        if not text or len(text) == 0:
            return jsonify({'status': 'error', 'message': '文本内容为空'})
        
        # 限制文本长度,避免处理过长的文本
        if len(text) > 10000:
            text = text[:10000] + "..."
        
        # 确保存储目录存在
        audio_dir = os.path.join(app.static_folder, 'audio')
        if not os.path.exists(audio_dir):
            os.makedirs(audio_dir)
        
        # 生成唯一文件名
        filename = f"tts_{uuid.uuid4().hex}.mp3"
        filepath = os.path.join(audio_dir, filename)
        
        # 启动后台线程生成语音文件
        tts_thread = threading.Thread(
            target=generate_tts_file,
            args=(text, filepath)
        )
        tts_thread.start()
        
        # 等待生成完成(最多30秒)
        tts_thread.join(timeout=30)
        
        # 检查文件是否生成成功
        if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
            # 返回文件URL
            audio_url = url_for('static', filename=f'audio/{filename}')
            return jsonify({
                'status': 'success',
                'audio_url': audio_url
            })
        else:
            return jsonify({
                'status': 'error',
                'message': '语音生成失败或超时'
            })
            
    except Exception as e:
        logger.error(f"文本转语音出错: {e}")
        return jsonify({
            'status': 'error',
            'message': str(e)
        })

def generate_tts_file(text, output_file):
    """生成语音文件的后台任务"""
    try:
        # 初始化语音引擎
        engine = pyttsx3.init()
        
        # 设置语音属性
        engine.setProperty('rate', 160)     # 语速
        engine.setProperty('volume', 1.0)   # 音量
        
        # 选择中文语音(如果可用)
        voices = engine.getProperty('voices')
        for voice in voices:
            if 'chinese' in voice.id.lower() or 'zh' in voice.id.lower():
                engine.setProperty('voice', voice.id)
                break
        
        # 保存为音频文件
        engine.save_to_file(text, output_file)
        engine.runAndWait()
        
        logger.info(f"语音文件已生成: {output_file}")
        
    except Exception as e:
        logger.error(f"生成语音文件出错: {e}")

8. 定时任务调度系统

设计定时任务系统,自动执行新闻抓取和播报:

def init_scheduler():
    """初始化调度器任务"""
    with app.app_context():
        # 清空现有任务
        schedule.clear()
        
        # 加载数据库中的任务
        tasks = ScheduledTask.query.all()
        for task in tasks:
            if task.task_type == 'fetch':
                add_fetch_task(task.task_id, task.schedule_type, task.value, task.time_value)
            elif task.task_type == 'broadcast':
                extra_params = json.loads(task.extra_params) if task.extra_params else {}
                count = extra_params.get('count', 5)
                add_broadcast_task(task.task_id, task.schedule_type, task.value, task.time_value, count)
        
        # 添加定期清理音频文件的任务
        schedule.every(1).hours.do(cleanup_audio_files).tag('cleanup_audio')
        logger.info("已添加音频文件清理任务,每小时执行一次")
        
        # 添加定期清理过期日志文件的任务
        schedule.every(12).hours.do(cleanup_log_files).tag('cleanup_logs')
        logger.info("已添加日志文件清理任务,每12小时执行一次")

def add_broadcast_task(task_id, schedule_type, value, time_value, count=5):
    """添加新闻播报任务到调度器"""
    
    def task_func():
        logger.info(f"执行新闻播报任务: {task_id}")
        # 创建应用上下文
        ctx = app.app_context()
        ctx.push()
        try:
            # 获取最新的新闻
            news_list = db.session.query(News).order_by(News.add_date.desc()).limit(count).all()
            if news_list:
                # 初始化语音引擎
                try:
                    engine = pyttsx3.init()
                    # 设置语音参数
                    engine.setProperty('rate', 150)
                    engine.setProperty('volume', 0.9)
                    
                    # 播报开始提示
                    engine.say("开始播报最新新闻")
                    engine.runAndWait()
                    
                    # 逐条播报新闻
                    for i, news in enumerate(news_list):
                        # 播报标题
                        engine.say(f"第{i+1}条新闻:{news.title}")
                        engine.runAndWait()
                        
                        # 播报简短描述
                        if news.description and len(news.description) > 0:
                            short_desc = news.description[:200] + "..." if len(news.description) > 200 else news.description
                            engine.say(short_desc)
                            engine.runAndWait()
                        
                        # 短暂停顿,区分不同新闻
                        time.sleep(1)
                    
                    # 播报结束提示
                    engine.say("新闻播报结束")
                    engine.runAndWait()
                    
                except Exception as e:
                    logger.error(f"语音引擎初始化或播报过程出错: {e}")
        finally:
            # 释放上下文
            ctx.pop()
    
    # 根据不同的调度类型添加任务
    if schedule_type == 'daily':
        schedule.every().day.at(time_value).do(task_func).tag(task_id)
    elif schedule_type == 'weekly':
        days = {
            1: schedule.every().monday,
            2: schedule.every().tuesday,
            3: schedule.every().wednesday,
            4: schedule.every().thursday,
            5: schedule.every().friday,
            6: schedule.every().saturday,
            7: schedule.every().sunday
        }
        days[value].at(time_value).do(task_func).tag(task_id)
    elif schedule_type == 'monthly':
        # 设置每月指定日期执行
        job = schedule.every().day.at(time_value).do(task_func).tag(task_id)
        
        # 自定义月度任务的执行条件
        def monthly_condition():
            return datetime.datetime.now().day == value
        
        job.do_run = lambda: monthly_condition() and task_func()

9. 日志系统设计

为应用添加完善的日志系统,便于监控和调试:

# 日志系统配置
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)

# 内存缓冲区,用于在UI中显示最新日志
log_buffer = deque(maxlen=1000)

# 创建自定义的日志记录器
class MemoryHandler(logging.Handler):
    """将日志记录到内存缓冲区,用于Web界面显示"""
    def emit(self, record):
        log_entry = self.format(record)
        log_buffer.append({
            'time': datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S'),
            'level': record.levelname,
            'message': record.getMessage(),
            'formatted': log_entry
        })

# 配置日志记录器
logger = logging.getLogger('rss_app')
logger.setLevel(logging.INFO)

# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)

# 内存处理器,用于UI显示
memory_handler = MemoryHandler()
memory_handler.setLevel(logging.INFO)
memory_handler.setFormatter(console_format)
logger.addHandler(memory_handler)

# 小时文件处理器,每小时自动创建一个新文件
hourly_handler = TimedRotatingFileHandler(
    filename=os.path.join(LOG_DIR, 'rss_app.log'),
    when='H',
    interval=1,
    backupCount=72,  # 保留3天的日志
    encoding='utf-8'
)
# 设置日志文件后缀格式为 年-月-日_小时
hourly_handler.suffix = "%Y-%m-%d_%H"
hourly_handler.setLevel(logging.INFO)
hourly_handler.setFormatter(console_format)
logger.addHandler(hourly_handler)

@app.route('/system_logs')
@login_required
def system_logs():
    """显示系统日志页面"""
    logger.info('访问系统日志页面')
    
    # 获取日志文件列表
    log_files = []
    try:
        # 获取所有日志文件并按修改时间排序
        log_pattern = os.path.join(LOG_DIR, 'rss_app.log*')
        all_log_files = glob.glob(log_pattern)
        all_log_files.sort(key=os.path.getmtime, reverse=True)
        
        for file_path in all_log_files:
            file_name = os.path.basename(file_path)
            # 获取文件大小和修改时间
            file_stats = os.stat(file_path)
            file_size = file_stats.st_size / 1024  # KB
            file_time = datetime.datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
            
            # 添加文件信息
            if file_name == 'rss_app.log':
                display_name = f"当前日志 ({file_size:.1f} KB) - {file_time}"
                log_files.append({
                    'name': display_name,
                    'path': file_path
                })
            else:
                # 格式化时间戳
                timestamp = file_name.replace('rss_app.log.', '')
                try:
                    # 尝试解析时间戳
                    parsed_time = datetime.datetime.strptime(timestamp, '%Y-%m-%d_%H')
                    display_name = f"{parsed_time.strftime('%Y-%m-%d %H:00')} ({file_size:.1f} KB)"
                except:
                    display_name = f"{file_name} ({file_size:.1f} KB) - {file_time}"
                
                log_files.append({
                    'name': display_name,
                    'path': file_path
                })
    except Exception as e:
        logger.error(f"获取日志文件列表出错: {str(e)}")
        flash(f"获取日志文件列表出错: {str(e)}", 'danger')
    
    # 统计信息
    stats = {
        'total': len(log_buffer),
        'error': sum(1 for log in log_buffer if log['level'] == 'ERROR'),
        'warning': sum(1 for log in log_buffer if log['level'] == 'WARNING'),
        'files': len(log_files)
    }
    
    return render_template('system_logs.html', log_files=log_files, stats=stats)

网站公告

今日签到

点亮在社区的每一天
去签到