智能AI医疗物资/耗材管理系统升级改造方案分析

发布于:2025-07-31 ⋅ 阅读:(21) ⋅ 点赞:(0)

基于AI技术的智能物资管理系统为各级医疗机构(包括三甲医院、社区诊所、药房等)提供了一套完整的数字化管理解决方案。系统通过物联网传感器实时监控库存状态,结合机器学习算法分析历史消耗数据、季节性因素和突发公共卫生事件影响,可提前90天预测物资需求波动,使库存周转率提升40%以上。

系统具备以下核心功能:

  1. 智能预警机制:当一次性医用口罩、防护服等关键物资低于安全库存阈值时,自动触发采购流程,并通过多供应商比价模块推荐最优采购方案
  2. 效期管理系统:采用"先进先出"原则,对近效期药品和耗材进行分级预警,避免物资过期造成的浪费
  3. 应急调配平台:在突发公共卫生事件期间,可实时显示区域内各医疗机构的物资储备情况,支持一键发起跨机构调拨

应用场景示例:

  • 某三甲医院在2025年流感季前,系统根据门诊量增长趋势预测到输液器需求将增加35%,提前完成备货避免了供应中断
  • 某社区卫生服务中心通过效期管理功能,将过期药品损耗率从8%降至2%以下

系统功能说明

这个智能AI医疗物资管理系统包含以下核心功能:

1. 库存管理

  • 实时监控所有医疗物资库存状态
  • 分类显示库存状态(紧急、不足、正常、充足)
  • 过期状态管理(已过期、即将过期、正常)
  • 详细的库存数据表格展示

2. 使用分析

  • 按科室统计物资使用量
  • 物资使用量排名(TOP 10)
  • 物资使用趋势分析(按周)

3. AI预测与采购建议

  • 基于机器学习(随机森林)的需求预测
  • 14天使用量预测
  • 智能采购建议(根据库存和预测)
  • 库存可维持天数计算

4. 预警系统

  • 侧边栏实时显示过期物资警告
  • 库存不足物资预警
  • 过期状态分类显示

5. 系统设置

  • 库存阈值配置
  • 过期预警天数设置
  • 数据导入/导出功能
  • 系统信息查看

技术特点

  1. 用户界面:使用Streamlit构建直观的Web界面
  2. 数据可视化:使用Plotly创建交互式图表
  3. 机器学习:使用Scikit-learn的随机森林算法进行需求预测
  4. 数据管理:模拟生成医疗物资数据,支持数据导入导出
  5. 预警系统:实时监控库存和过期状态并提供警告

使用方法

  1. 安装依赖库:
pip install streamlit pandas numpy plotly scikit-learn
  1. 运行应用:
streamlit run medical_inventory_system.py
  1. 在浏览器中访问应用(默认地址:http://localhost:8501)

系统会自动生成模拟数据,您可以通过侧边栏筛选器查看不同类别的物资,并在各标签页中查看库存状态、使用分析和预测结果。

这个系统为医疗机构的物资管理提供了智能化解决方案,帮助管理人员优化库存、减少浪费、确保关键医疗物资的供应。

import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import random
import hashlib
import base64

# 设置页面
st.set_page_config(
    page_title="智能AI医疗物资管理系统",
    page_icon="🏥",
    layout="wide",
    initial_sidebar_state="expanded"
)

# 初始化会话状态
if 'authenticated' not in st.session_state:
    st.session_state.authenticated = False

if 'username' not in st.session_state:
    st.session_state.username = None

# 模拟用户数据库
users = {
    "admin": hashlib.sha256("admin123".encode()).hexdigest(),
    "doctor": hashlib.sha256("doctor123".encode()).hexdigest(),
    "nurse": hashlib.sha256("nurse123".encode()).hexdigest(),
    "pharmacist": hashlib.sha256("pharmacist123".encode()).hexdigest()
}

# 用户权限
permissions = {
    "admin": ["view", "edit", "manage", "settings"],
    "doctor": ["view", "request"],
    "nurse": ["view", "request"],
    "pharmacist": ["view", "edit", "manage"]
}

# 登录页面
def login_page():
    st.title("🏥 智能AI医疗物资管理系统")
    st.subheader("用户登录")
    
    username = st.text_input("用户名")
    password = st.text_input("密码", type="password")
    
    if st.button("登录"):
        if username in users and users[username] == hashlib.sha256(password.encode()).hexdigest():
            st.session_state.authenticated = True
            st.session_state.username = username
            st.success("登录成功!正在进入系统...")
            st.experimental_rerun()
        else:
            st.error("用户名或密码错误")

# 主应用
def main_app():
    # 标题和说明
    st.title(f"🏥 智能AI医疗物资/耗材管理系统")
    st.markdown(f"欢迎回来,{st.session_state.username}!")
    
    st.markdown("""
        <div style="background-color:#f0f2f6;padding:10px;border-radius:10px;margin-bottom:20px;">
        <h3 style="color:#1e3a8a;">系统功能:</h3>
        <ul>
            <li>实时库存监控与预警</li>
            <li>物资过期自动提醒</li>
            <li>AI驱动的需求预测</li>
            <li>智能采购建议</li>
            <li>物资使用分析与可视化</li>
            <li>物资出入库管理</li>
        </ul>
        </div>
    """, unsafe_allow_html=True)
    
    # 模拟数据生成
    @st.cache_data
    def generate_data():
        # 医疗物资类别
        categories = [
            '防护用品', '注射器械', '消毒用品', '手术器械', 
            '诊断试剂', '医用敷料', '药品', '一次性用品'
        ]
        
        # 具体物资名称
        items = {
            '防护用品': ['N95口罩', '防护服', '护目镜', '手套', '面屏'],
            '注射器械': ['注射器', '输液器', '针头', '输液袋', '输液管'],
            '消毒用品': ['酒精', '碘伏', '手消液', '消毒湿巾', '84消毒液'],
            '手术器械': ['手术刀', '镊子', '缝合针', '止血钳', '持针器'],
            '诊断试剂': ['血糖试纸', '新冠检测试剂', '血常规试剂', '尿检试纸', 'PCR试剂'],
            '医用敷料': ['纱布', '绷带', '棉签', '创可贴', '医用胶带'],
            '药品': ['阿司匹林', '胰岛素', '抗生素', '止痛药', '降压药'],
            '一次性用品': ['医用帽', '鞋套', '床单', '尿杯', '采血管']
        }
        
        # 生成物资数据
        medical_items = []
        for cat in categories:
            for item in items[cat]:
                medical_items.append({
                    '物资ID': f"{cat[:2]}{random.randint(1000,9999)}",
                    '物资名称': item,
                    '类别': cat,
                    '单位': random.choice(['个', '盒', '瓶', '包', '箱']),
                    '安全库存': random.randint(20, 100),
                    '当前库存': random.randint(0, 200),
                    '最近进货日期': (datetime.now() - timedelta(days=random.randint(1, 60))).strftime('%Y-%m-%d'),
                    '过期日期': (datetime.now() + timedelta(days=random.randint(30, 730))).strftime('%Y-%m-%d'),
                    '供应商': random.choice(['康泰医疗', '强生医疗', '美敦力', '3M医疗', '国药集团', '稳健医疗']),
                    '单价': round(random.uniform(1, 100), 2),
                    '总价值': 0  # 初始化为0,后面计算
                })
        
        # 生成使用记录
        usage_records = []
        for _ in range(500):
            item = random.choice(medical_items)
            usage_date = datetime.now() - timedelta(days=random.randint(1, 90))
            usage_records.append({
                '日期': usage_date.strftime('%Y-%m-%d'),
                '物资ID': item['物资ID'],
                '物资名称': item['物资名称'],
                '类别': item['类别'],
                '使用量': random.randint(1, 20),
                '使用科室': random.choice(['急诊科', '内科', '外科', '儿科', '妇产科', 'ICU', '手术室']),
                '操作人': random.choice(list(users.keys()))
            })
        
        # 生成出入库记录
        transaction_records = []
        for item in medical_items:
            # 初始入库记录
            initial_quantity = random.randint(50, 150)
            transaction_records.append({
                '日期': (datetime.now() - timedelta(days=random.randint(30, 90))).strftime('%Y-%m-%d'),
                '物资ID': item['物资ID'],
                '物资名称': item['物资名称'],
                '类别': item['类别'],
                '操作类型': '入库',
                '数量': initial_quantity,
                '单价': item['单价'],
                '总金额': initial_quantity * item['单价'],
                '供应商': item['供应商'],
                '操作人': 'system',
                '备注': '系统初始化'
            })
            
            # 随机添加一些出入库记录
            for _ in range(random.randint(1, 10)):
                is_in = random.random() > 0.5
                quantity = random.randint(1, 30)
                transaction_date = (datetime.now() - timedelta(days=random.randint(1, 30))).strftime('%Y-%m-%d')
                
                transaction_records.append({
                    '日期': transaction_date,
                    '物资ID': item['物资ID'],
                    '物资名称': item['物资名称'],
                    '类别': item['类别'],
                    '操作类型': '入库' if is_in else '出库',
                    '数量': quantity,
                    '单价': item['单价'] if is_in else 0,  # 出库不需要单价
                    '总金额': quantity * item['单价'] if is_in else 0,
                    '供应商': item['供应商'] if is_in else '',
                    '操作人': random.choice(list(users.keys())),
                    '备注': '采购入库' if is_in else '科室领用'
                })
        
        # 计算总价值
        for item in medical_items:
            item['总价值'] = item['当前库存'] * item['单价']
        
        return pd.DataFrame(medical_items), pd.DataFrame(usage_records), pd.DataFrame(transaction_records)
    
    # 加载数据
    inventory_df, usage_df, transaction_df = generate_data()
    
    # 添加库存状态列
    def get_inventory_status(row):
        if row['当前库存'] <= row['安全库存'] * 0.3:
            return '紧急'
        elif row['当前库存'] <= row['安全库存']:
            return '不足'
        elif row['当前库存'] <= row['安全库存'] * 1.5:
            return '正常'
        else:
            return '充足'
    
    inventory_df['库存状态'] = inventory_df.apply(get_inventory_status, axis=1)
    
    # 添加过期状态
    def get_expiry_status(row):
        expiry_date = datetime.strptime(row['过期日期'], '%Y-%m-%d')
        days_to_expire = (expiry_date - datetime.now()).days
        if days_to_expire < 0:
            return '已过期'
        elif days_to_expire < 30:
            return '即将过期'
        else:
            return '正常'
    
    inventory_df['过期状态'] = inventory_df.apply(get_expiry_status, axis=1)
    
    # 侧边栏 - 筛选器
    st.sidebar.header("🔍 筛选选项")
    category_filter = st.sidebar.multiselect(
        "选择物资类别",
        options=inventory_df['类别'].unique(),
        default=inventory_df['类别'].unique()
    )
    
    status_filter = st.sidebar.multiselect(
        "选择库存状态",
        options=inventory_df['库存状态'].unique(),
        default=inventory_df['库存状态'].unique()
    )
    
    expiry_filter = st.sidebar.multiselect(
        "选择过期状态",
        options=inventory_df['过期状态'].unique(),
        default=inventory_df['过期状态'].unique()
    )
    
    # 应用筛选
    filtered_df = inventory_df[
        (inventory_df['类别'].isin(category_filter)) &
        (inventory_df['库存状态'].isin(status_filter)) &
        (inventory_df['过期状态'].isin(expiry_filter))
    ]
    
    # 主界面布局
    tabs = ["📦 库存概览", "📊 使用分析", "🔮 智能预测", "📈 物资管理", "⚙️ 系统设置"]
    
    # 根据用户权限显示不同的标签页
    if st.session_state.username in permissions:
        user_permissions = permissions[st.session_state.username]
        if "manage" not in user_permissions:
            tabs.remove("📈 物资管理")
        if "settings" not in user_permissions:
            tabs.remove("⚙️ 系统设置")
    
    tab1, tab2, tab3, *other_tabs = st.tabs(tabs)
    
    with tab1:
        # 库存概览
        st.subheader("医疗物资库存概览")
        
        # KPI 指标
        col1, col2, col3, col4 = st.columns(4)
        col1.metric("物资总数", len(filtered_df))
        col2.metric("库存不足物资", len(filtered_df[filtered_df['库存状态'] == '不足']))
        col3.metric("库存紧急物资", len(filtered_df[filtered_df['库存状态'] == '紧急']))
        col4.metric("即将过期物资", len(filtered_df[filtered_df['过期状态'] == '即将过期']))
        
        # 库存价值
        total_value = filtered_df['总价值'].sum()
        st.metric("库存总价值", f"¥{total_value:.2f}")
        
        # 库存状态分布
        st.subheader("库存状态分布")
        fig1 = px.pie(
            filtered_df, 
            names='库存状态',
            color='库存状态',
            color_discrete_map={
                '紧急': 'red',
                '不足': 'orange',
                '正常': 'green',
                '充足': 'blue'
            }
        )
        st.plotly_chart(fig1, use_container_width=True)
        
        # 过期状态分布
        st.subheader("过期状态分布")
        fig2 = px.pie(
            filtered_df, 
            names='过期状态',
            color='过期状态',
            color_discrete_map={
                '已过期': 'red',
                '即将过期': 'orange',
                '正常': 'green'
            }
        )
        st.plotly_chart(fig2, use_container_width=True)
        
        # 库存明细表
        st.subheader("库存明细")
        st.dataframe(filtered_df[['物资ID', '物资名称', '类别', '单位', '安全库存', '当前库存', '库存状态', '过期日期', '过期状态', '供应商', '单价', '总价值']], 
                    height=400,
                    column_config={
                        "当前库存": st.column_config.ProgressColumn(
                            "当前库存",
                            help="当前库存水平",
                            format="%f",
                            min_value=0,
                            max_value=200,
                        ),
                        "总价值": st.column_config.NumberColumn(
                            "总价值",
                            format="¥%f",
                        )
                    })
        
        # 导出数据按钮
        if st.button("导出库存数据"):
            csv = filtered_df.to_csv(sep='\t', na_rep='nan')
            b64 = base64.b64encode(csv.encode()).decode()
            href = f'<a href="data:file/csv;base64,{b64}" download="inventory_data.csv">点击下载数据</a>'
            st.markdown(href, unsafe_allow_html=True)
    
    with tab2:
        # 使用分析
        st.subheader("物资使用分析")
        
        # 按类别使用量
        st.subheader("各科室物资使用量")
        usage_by_dept = usage_df.groupby('使用科室')['使用量'].sum().reset_index()
        fig3 = px.bar(
            usage_by_dept,
            x='使用科室',
            y='使用量',
            color='使用科室'
        )
        st.plotly_chart(fig3, use_container_width=True)
        
        # 按物资使用量
        st.subheader("物资使用量TOP 10")
        usage_by_item = usage_df.groupby('物资名称')['使用量'].sum().reset_index().sort_values('使用量', ascending=False).head(10)
        fig4 = px.bar(
            usage_by_item,
            x='物资名称',
            y='使用量',
            color='物资名称'
        )
        st.plotly_chart(fig4, use_container_width=True)
        
        # 使用趋势分析
        st.subheader("物资使用趋势")
        usage_df['日期'] = pd.to_datetime(usage_df['日期'])
        usage_df['周'] = usage_df['日期'].dt.isocalendar().week
        usage_trend = usage_df.groupby(['周', '类别'])['使用量'].sum().reset_index()
        
        fig5 = px.line(
            usage_trend,
            x='周',
            y='使用量',
            color='类别',
            title="每周物资使用趋势"
        )
        st.plotly_chart(fig5, use_container_width=True)
        
        # 各科室使用物资类别分布
        st.subheader("各科室使用物资类别分布")
        dept_category = usage_df.groupby(['使用科室', '类别'])['使用量'].sum().reset_index()
        fig7 = px.bar(
            dept_category,
            x='使用科室',
            y='使用量',
            color='类别',
            barmode='stack'
        )
        st.plotly_chart(fig7, use_container_width=True)
    
    with tab3:
        # 智能预测
        st.subheader("AI驱动的需求预测")
        
        # 选择预测物资
        selected_item = st.selectbox("选择要预测的物资", inventory_df['物资名称'].unique())
        
        # 获取选定物资的数据
        item_data = usage_df[usage_df['物资名称'] == selected_item].copy()
        item_data['日期'] = pd.to_datetime(item_data['日期'])
        
        if len(item_data) > 10:
            # 准备数据
            item_data = item_data.groupby('日期')['使用量'].sum().reset_index()
            item_data = item_data.set_index('日期').asfreq('D').fillna(0).reset_index()
            
            # 添加时间特征
            item_data['day_of_week'] = item_data['日期'].dt.dayofweek
            item_data['day_of_month'] = item_data['日期'].dt.day
            item_data['month'] = item_data['日期'].dt.month
            item_data['is_weekend'] = item_data['day_of_week'].isin([5, 6]).astype(int)
            
            # 添加假期特征(简化版)
            holidays = [
                # 这里可以添加更多假期日期
                pd.to_datetime('2025-01-01'),  # 元旦
                pd.to_datetime('2025-02-10'),  # 春节
                pd.to_datetime('2025-04-05'),  # 清明节
                pd.to_datetime('2025-05-01'),  # 劳动节
                pd.to_datetime('2025-06-07'),  # 端午节
                pd.to_datetime('2025-09-15'),  # 中秋节
                pd.to_datetime('2025-10-01'),  # 国庆节
            ]
            item_data['is_holiday'] = item_data['日期'].isin(holidays).astype(int)
            
            # 添加滞后特征
            for i in range(1, 8):  # 添加前7天的使用量作为特征
                item_data[f'lag_{i}'] = item_data['使用量'].shift(i)
            item_data = item_data.dropna()  # 删除包含NaN的行
            
            # 分割数据
            X = item_data[['day_of_week', 'day_of_month', 'month', 'is_weekend', 'is_holiday'] + [f'lag_{i}' for i in range(1, 8)]]
            y = item_data['使用量']
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
            
            # 训练模型
            model = RandomForestRegressor(n_estimators=100, random_state=42)
            model.fit(X_train, y_train)
            
            # 预测
            y_pred = model.predict(X_test)
            
            # 评估
            mae = mean_absolute_error(y_test, y_pred)
            
            # 创建预测日期范围
            last_date = item_data['日期'].max()
            future_dates = pd.date_range(start=last_date + timedelta(days=1), periods=30)  # 预测未来30天
            
            # 准备未来数据
            future_data = pd.DataFrame({
                '日期': future_dates,
                'day_of_week': future_dates.dayofweek,
                'day_of_month': future_dates.day,
                'month': future_dates.month,
                'is_weekend': future_dates.dayofweek.isin([5, 6]).astype(int),
                'is_holiday': future_dates.isin(holidays).astype(int)
            })
            
            # 为了预测未来值,我们需要前7天的实际/预测值
            # 这里我们使用最后7天的实际值作为初始值
            last_7_days = item_data['使用量'].tail(7).values
            
            # 逐步预测未来30天的值
            future_pred = []
            for i in range(len(future_dates)):
                # 创建当前预测所需的特征
                features = [
                    future_data.iloc[i]['day_of_week'],
                    future_data.iloc[i]['day_of_month'],
                    future_data.iloc[i]['month'],
                    future_data.iloc[i]['is_weekend'],
                    future_data.iloc[i]['is_holiday']
                ]
                features.extend(last_7_days)
                
                # 预测当前天的值
                pred = model.predict([features])[0]
                future_pred.append(pred)
                
                # 更新最后7天的值(将最新预测添加到末尾,并移除最旧的值)
                last_7_days = np.append(last_7_days[1:], pred)
            
            # 创建结果DataFrame
            test_dates = item_data['日期'].iloc[-len(y_test):]
            results = pd.DataFrame({
                '日期': test_dates,
                '实际使用量': y_test,
                '预测使用量': y_pred
            })
            
            future_results = pd.DataFrame({
                '日期': future_dates,
                '预测使用量': future_pred
            })
            
            # 显示评估结果
            st.metric("预测平均绝对误差", f"{mae:.2f}")
            
            # 显示预测图表
            fig6 = go.Figure()
            fig6.add_trace(go.Scatter(
                x=results['日期'],
                y=results['实际使用量'],
                mode='lines+markers',
                name='实际使用量'
            ))
            fig6.add_trace(go.Scatter(
                x=results['日期'],
                y=results['预测使用量'],
                mode='lines+markers',
                name='预测使用量'
            ))
            fig6.add_trace(go.Scatter(
                x=future_results['日期'],
                y=future_results['预测使用量'],
                mode='lines+markers',
                name='未来预测',
                line=dict(dash='dash')
            ))
            
            fig6.update_layout(
                title=f"{selected_item} 使用量预测",
                xaxis_title="日期",
                yaxis_title="使用量",
                legend_title="图例"
            )
            
            st.plotly_chart(fig6, use_container_width=True)
            
            # 计算平均每日使用量
            avg_daily_usage = item_data['使用量'].mean()
            
            # 获取当前库存
            current_stock = inventory_df.loc[inventory_df['物资名称'] == selected_item, '当前库存'].values[0]
            
            # 计算库存可维持天数
            if avg_daily_usage > 0:
                days_left = current_stock / avg_daily_usage
            else:
                days_left = float('inf')
            
            # 生成采购建议
            st.subheader("智能采购建议")
            if days_left < 7:
                st.error(f"⚠️ 紧急采购建议:{selected_item}库存仅能维持{days_left:.1f}天,请立即采购!")
            elif days_left < 14:
                st.warning(f"⚠️ 采购建议:{selected_item}库存仅能维持{days_left:.1f}天,建议尽快采购")
            else:
                st.success(f"✅ {selected_item}库存充足,可维持{days_left:.1f}天")
            
            # 计算建议采购量
            safety_stock = inventory_df.loc[inventory_df['物资名称'] == selected_item, '安全库存'].values[0]
            
            # 基于未来30天的预测使用量计算建议采购量
            total_predicted_usage = future_results['预测使用量'].sum()
            suggested_order = max(0, total_predicted_usage * 1.2 + safety_stock - current_stock)  # 增加20%的缓冲
            
            st.info(f"建议采购量:{suggested_order:.0f} {inventory_df.loc[inventory_df['物资名称'] == selected_item, '单位'].values[0]}")
            
            # 预测图表 - 月度汇总
            future_results['month'] = future_results['日期'].dt.month_name()
            monthly_prediction = future_results.groupby('month')['预测使用量'].sum().reset_index()
            
            fig8 = px.bar(
                monthly_prediction,
                x='month',
                y='预测使用量',
                title=f"{selected_item} 月度使用量预测"
            )
            st.plotly_chart(fig8, use_container_width=True)
            
        else:
            st.warning("该物资使用数据不足,无法进行预测")
    
    if "📈 物资管理" in tabs:
        with other_tabs[0]:  # 物资管理
            st.subheader("物资管理")
            
            # 物资操作选项卡
            manage_tabs = st.tabs(["入库管理", "出库管理", "出入库记录", "物资信息管理"])
            
            with manage_tabs[0]:  # 入库管理
                st.header("物资入库")
                
                # 选择要入库的物资
               入库_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())
                
                # 获取物资信息
                物资_info = inventory_df[inventory_df['物资名称'] == 入库_物资].iloc[0]
                
                # 显示当前库存
                st.write(f"当前库存: {物资_info['当前库存']} {物资_info['单位']}")
                
                # 入库数量
                入库数量 = st.number_input("入库数量", min_value=1, value=10)
                
                # 入库日期
                入库日期 = st.date_input("入库日期", datetime.now()).strftime('%Y-%m-%d')
                
                # 供应商
                供应商 = st.selectbox("供应商", inventory_df['供应商'].unique())
                
                # 单价
                单价 = st.number_input("单价", min_value=0.01, value=物资_info['单价'], step=0.01)
                
                # 备注
                备注 = st.text_input("备注", "采购入库")
                
                # 入库按钮
                if st.button("确认入库"):
                    # 更新库存
                    inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '当前库存'] += 入库数量
                    inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '最近进货日期'] = 入库日期
                    inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '单价'] = 单价
                    inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '当前库存'] * 单价
                    
                    # 添加入库记录
                    new_transaction = pd.DataFrame({
                        '日期': [入库日期],
                        '物资ID': [物资_info['物资ID']],
                        '物资名称': [入库_物资],
                        '类别': [物资_info['类别']],
                        '操作类型': ['入库'],
                        '数量': [入库数量],
                        '单价': [单价],
                        '总金额': [入库数量 * 单价],
                        '供应商': [供应商],
                        '操作人': [st.session_state.username],
                        '备注': [备注]
                    })
                    transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True)
                    
                    st.success(f"成功入库 {入库数量} {物资_info['单位']} {入库_物资}")
                    st.balloons()
            
            with manage_tabs[1]:  # 出库管理
                st.header("物资出库")
                
                # 选择要出库的物资
                出库_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())
                
                # 获取物资信息
                物资_info = inventory_df[inventory_df['物资名称'] == 出库_物资].iloc[0]
                
                # 显示当前库存
                st.write(f"当前库存: {物资_info['当前库存']} {物资_info['单位']}")
                
                # 出库数量
                出库数量 = st.number_input("出库数量", min_value=1, value=1, max_value=int(物资_info['当前库存']))
                
                # 出库日期
                出库日期 = st.date_input("出库日期", datetime.now()).strftime('%Y-%m-%d')
                
                # 使用科室
                使用科室 = st.selectbox("使用科室", usage_df['使用科室'].unique())
                
                # 备注
                备注 = st.text_input("备注", "科室领用")
                
                # 出库按钮
                if st.button("确认出库"):
                    # 更新库存
                    inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '当前库存'] -= 出库数量
                    inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '当前库存'] * 物资_info['单价']
                    
                    # 添加出库记录
                    new_transaction = pd.DataFrame({
                        '日期': [出库日期],
                        '物资ID': [物资_info['物资ID']],
                        '物资名称': [出库_物资],
                        '类别': [物资_info['类别']],
                        '操作类型': ['出库'],
                        '数量': [出库数量],
                        '单价': [0],  # 出库不需要单价
                        '总金额': [0],
                        '供应商': [''],
                        '操作人': [st.session_state.username],
                        '备注': [备注]
                    })
                    transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True)
                    
                    # 添加使用记录
                    new_usage = pd.DataFrame({
                        '日期': [出库日期],
                        '物资ID': [物资_info['物资ID']],
                        '物资名称': [出库_物资],
                        '类别': [物资_info['类别']],
                        '使用量': [出库数量],
                        '使用科室': [使用科室],
                        '操作人': [st.session_state.username]
                    })
                    usage_df = pd.concat([usage_df, new_usage], ignore_index=True)
                    
                    st.success(f"成功出库 {出库数量} {物资_info['单位']} {出库_物资}")
                    st.balloons()
            
            with manage_tabs[2]:  # 出入库记录
                st.header("出入库记录")
                
                # 筛选选项
                col1, col2 = st.columns(2)
                with col1:
                    记录类型 = st.selectbox("记录类型", ["全部", "入库", "出库"])
                with col2:
                    日期范围 = st.date_input("日期范围", [datetime.now() - timedelta(days=30), datetime.now()])
                
                # 应用筛选
                filtered_transactions = transaction_df.copy()
                
                if 记录类型 != "全部":
                    filtered_transactions = filtered_transactions[filtered_transactions['操作类型'] == 记录类型]
                
                if len(日期范围) == 2:
                    start_date = 日期范围[0].strftime('%Y-%m-%d')
                    end_date = 日期范围[1].strftime('%Y-%m-%d')
                    filtered_transactions = filtered_transactions[(filtered_transactions['日期'] >= start_date) & (filtered_transactions['日期'] <= end_date)]
                
                # 显示记录
                st.dataframe(filtered_transactions, height=500)
                
                # 导出记录
                if st.button("导出记录"):
                    csv = filtered_transactions.to_csv(sep='\t', na_rep='nan')
                    b64 = base64.b64encode(csv.encode()).decode()
                    href = f'<a href="data:file/csv;base64,{b64}" download="transaction_records.csv">点击下载记录</a>'
                    st.markdown(href, unsafe_allow_html=True)
            
            with manage_tabs[3]:  # 物资信息管理
                st.header("物资信息管理")
                
                # 选择要管理的物资
                管理_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())
                
                # 获取物资信息
                物资_info = inventory_df[inventory_df['物资名称'] == 管理_物资].iloc[0]
                
                # 显示当前信息
                st.subheader("当前信息")
                col1, col2 = st.columns(2)
                with col1:
                    st.write(f"**物资ID**: {物资_info['物资ID']}")
                    st.write(f"**类别**: {物资_info['类别']}")
                    st.write(f"**单位**: {物资_info['单位']}")
                    st.write(f"**安全库存**: {物资_info['安全库存']}")
                with col2:
                    st.write(f"**当前库存**: {物资_info['当前库存']}")
                    st.write(f"**最近进货日期**: {物资_info['最近进货日期']}")
                    st.write(f"**过期日期**: {物资_info['过期日期']}")
                    st.write(f"**供应商**: {物资_info['供应商']}")
                
                # 修改信息
                st.subheader("修改信息")
                
                # 安全库存
                新安全库存 = st.number_input("安全库存", min_value=0, value=int(物资_info['安全库存']))
                
                # 供应商
                新供应商 = st.selectbox("供应商", inventory_df['供应商'].unique(), index=list(inventory_df['供应商'].unique()).index(物资_info['供应商']))
                
                # 过期日期
                新过期日期 = st.date_input("过期日期", datetime.strptime(物资_info['过期日期'], '%Y-%m-%d')).strftime('%Y-%m-%d')
                
                # 单价
                新单价 = st.number_input("单价", min_value=0.01, value=float(物资_info['单价']), step=0.01)
                
                # 备注
                修改备注 = st.text_input("修改备注", "系统更新")
                
                # 修改按钮
                if st.button("确认修改"):
                    # 更新物资信息
                    inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '安全库存'] = 新安全库存
                    inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '供应商'] = 新供应商
                    inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '过期日期'] = 新过期日期
                    inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '单价'] = 新单价
                    inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '当前库存'] * 新单价
                    
                    # 更新库存状态
                    inventory_df['库存状态'] = inventory_df.apply(get_inventory_status, axis=1)
                    inventory_df['过期状态'] = inventory_df.apply(get_expiry_status, axis=1)
                    
                    st.success(f"成功更新 {管理_物资} 的信息")
                    st.balloons()
    
    if "⚙️ 系统设置" in tabs:
        with other_tabs[-1]:  # 系统设置
            st.subheader("系统配置")
            
            with st.expander("库存阈值设置"):
                st.write("设置库存预警阈值")
                
                # 获取当前设置
                current_low_threshold = 70
                current_critical_threshold = 30
                
                low_threshold = st.slider("库存不足阈值(%)", 30, 100, current_low_threshold)
                critical_threshold = st.slider("库存紧急阈值(%)", 0, 50, current_critical_threshold)
                
                if st.button("保存库存阈值设置"):
                    # 这里可以添加保存设置的逻辑
                    st.success(f"已更新库存阈值设置:不足({low_threshold}%),紧急({critical_threshold}%)")
            
            with st.expander("过期预警设置"):
                st.write("设置过期预警天数")
                
                # 获取当前设置
                current_expiry_warning = 30
                
                expiry_warning = st.slider("即将过期预警(天)", 1, 90, current_expiry_warning)
                
                if st.button("保存过期预警设置"):
                    # 这里可以添加保存设置的逻辑
                    st.success(f"已更新过期预警设置:{expiry_warning}天")
            
            with st.expander("数据管理"):
                st.write("导入/导出数据")
                
                col1, col2 = st.columns(2)
                with col1:
                    uploaded_file = st.file_uploader("上传库存数据(CSV格式)")
                    if uploaded_file is not None:
                        try:
                            new_inventory = pd.read_csv(uploaded_file)
                            # 这里可以添加数据合并的逻辑
                            st.success("文件上传成功!数据已合并")
                        except Exception as e:
                            st.error(f"文件上传失败:{str(e)}")
                
                with col2:
                    st.write("导出数据")
                    if st.button("导出库存数据"):
                        csv = inventory_df.to_csv(sep='\t', na_rep='nan')
                        b64 = base64.b64encode(csv.encode()).decode()
                        href = f'<a href="data:file/csv;base64,{b64}" download="inventory_data.csv">点击下载库存数据</a>'
                        st.markdown(href, unsafe_allow_html=True)
                    
                    if st.button("导出使用记录"):
                        csv = usage_df.to_csv(sep='\t', na_rep='nan')
                        b64 = base64.b64encode(csv.encode()).decode()
                        href = f'<a href="data:file/csv;base64,{b64}" download="usage_records.csv">点击下载使用记录</a>'
                        st.markdown(href, unsafe_allow_html=True)
            
            with st.expander("用户管理"):
                st.write("添加/管理用户")
                
                # 仅管理员可以管理用户
                if st.session_state.username == "admin":
                    new_username = st.text_input("新用户名")
                    new_password = st.text_input("新密码", type="password")
                    new_role = st.selectbox("用户角色", ["doctor", "nurse", "pharmacist", "admin"])
                    
                    if st.button("添加用户"):
                        if new_username and new_password:
                            # 这里可以添加用户添加的逻辑
                            st.success(f"成功添加用户:{new_username},角色:{new_role}")
                        else:
                            st.error("用户名和密码不能为空")
                
                # 显示当前用户列表
                st.write("当前用户列表")
                user_list = pd.DataFrame({
                    '用户名': list(users.keys()),
                    '角色': [permissions.get(user, ["无权限"])[0] for user in users.keys()]
                })
                st.dataframe(user_list)
            
            with st.expander("系统信息"):
                st.write("**版本信息**: v4.3.2")
                st.write("**最后")
                st.write("**最后更新**: 2025-07-30")
                st.write("**开发者**: 医疗AI团队")
                st.write("**技术支持**: tech-support@medai.com")
    
    # 过期物资警告
    expiring_items = inventory_df[inventory_df['过期状态'].isin(['已过期', '即将过期'])]
    if not expiring_items.empty:
        st.sidebar.warning("⚠️ 有过期风险物资")
        for _, row in expiring_items.iterrows():
            days_left = (datetime.strptime(row['过期日期'], '%Y-%m-%d') - datetime.now()).days
            status = "已过期" if days_left < 0 else f"还剩{days_left}天"
            st.sidebar.error(f"{row['物资名称']} - {status}")
    
    # 库存不足警告
    low_stock_items = inventory_df[inventory_df['库存状态'].isin(['不足', '紧急'])]
    if not low_stock_items.empty:
        st.sidebar.warning("⚠️ 有库存不足物资")
        for _, row in low_stock_items.iterrows():
            st.sidebar.error(f"{row['物资名称']} - 当前库存: {row['当前库存']} (安全库存: {row['安全库存']})")
    
    # 登出按钮
    if st.sidebar.button("登出"):
        st.session_state.authenticated = False
        st.session_state.username = None
        st.success("已成功登出")
        st.experimental_rerun()
    
    # 页脚
    st.markdown("---")
    st.caption("© 2025 智能医疗物资管理系统 | 基于AI的医疗物资管理解决方案")

运行应用

if name == “main”:
if st.session_state.authenticated:
main_app()
else:
login_page()

“增量升级”方案:

────────────────────

附录一、实时库存同步(对接 HIS/ERP)

────────────────────

  1. 目标
    让系统不再依赖“模拟数据”,而是实时读取 HIS/ERP 的出入库流水,并自动更新库存。

  2. 关键思路
    • 在 generate_data() 之前增加一个“数据源选择”开关:

    • “模拟数据”(默认)
    • “HIS/ERP 接口”
      • 新增 fetch_realtime_inventory()fetch_realtime_usage() 两个函数,利用 RESTful API 或数据库直连拉取数据。
      • 用 Streamlit 的 st.cache_data(ttl=300) 把结果缓存 5 分钟,既减轻源系统压力,又保证页面流畅。
  3. 代码片段

def fetch_realtime_inventory():
    # 示例:调用 REST 接口
    r = requests.get(
        st.secrets["his"]["base_url"] + "/api/v1/inventory",
        headers={"Authorization": f"Bearer {st.secrets['his']['token']}"}
    )
    r.raise_for_status()
    return pd.DataFrame(r.json())

def fetch_realtime_usage():
    # 示例:直连 SQL Server
    conn = pymssql.connect(
        server=st.secrets["erp"]["host"],
        user=st.secrets["erp"]["user"],
        password=st.secrets["erp"]["pwd"],
        database="ERP"
    )
    sql = """
        SELECT CONVERT(date, CreateTime) AS 日期,
               ItemID         AS 物资ID,
               ItemName       AS 物资名称,
               Category       AS 类别,
               Quantity       AS 使用量,
               Dept           AS 使用科室
        FROM   t_Outbound
        WHERE  CreateTime >= DATEADD(day, -90, GETDATE())
    """
    return pd.read_sql(sql, conn)
  1. 落地注意
    • 用 st.secrets 保存接口密钥,避免硬编码。
    • 若源系统无“过期日期”字段,可在本地维护一张映射表(物资ID→过期日期),再与实时库存 merge
    • 建议把“同步日志”写入侧边栏,异常时给出提示,方便运维。

────────────────────

附录二、移动端 PDA 扫码盘点

────────────────────

  1. 目标
    护士/库管员用手机或 PDA 扫条码即可快速盘点并回写库存。

  2. 关键思路
    • 用 Streamlit 的“camera_input”组件 + pyzbar/opencv 解析二维码/条形码。
    • 扫码后弹窗显示当前库存,可直接输入“盘点数量”,点击“确认”即回写后台(支持写回 HIS 或本地 SQLite)。
    • 盘点过程离线缓存盘点单,网络恢复后批量提交。

  3. 代码片段

st.subheader("PDA 扫码盘点")
img_file = st.camera_input("请对准物资条码")
if img_file:
    img = Image.open(img_file)
    barcodes = decode(img)
    if barcodes:
        code = barcodes[0].data.decode("utf-8")
        item = inventory_df[inventory_df["物资ID"] == code]
        if not item.empty:
            st.write("当前库存:", item.iloc[0]["当前库存"])
            new_qty = st.number_input("盘点后库存", min_value=0, value=int(item.iloc[0]["当前库存"]))
            if st.button("确认更新"):
                update_inventory(code, new_qty)   # 调用接口或写本地
                st.success("库存已更新")
        else:
            st.error("未找到该物资")
  1. 落地注意
    • 手机端部署:可用 Streamlit Cloud + 手机浏览器,或打包成 PWA。
    • 条码标准:若院内用 GS1-128,则解析时注意分隔符。
    • 并发冲突:盘点时给记录加乐观锁(版本号或时间戳字段)。

────────────────────

附录三、AI 预测再升级:加入“节假日/疫情”外生特征

────────────────────

  1. 目标
    让 AI 预测更准,尤其在春节、疫情爆发等特殊时期。

  2. 关键思路
    • 收集外部特征:

    • 中国法定节假日(可用 ChineseCalendar 库)
    • 本地新冠新增病例(卫健委公开接口)
    • 气温、湿度(气象局 API)
      • 把上述特征拼接到训练集,再训练 RandomForest 或 LightGBM。
      • 预测时,未来 14 天的外部特征可简单用“最近 7 天均值”填充,或调用同样接口拉取。
  3. 代码片段

from chinese_calendar import is_holiday

def add_external_features(df):
    df["是否节假日"] = df["日期"].apply(lambda x: int(is_holiday(x)))
    # 例:获取新增病例
    df["新增病例"] = fetch_covid_cases(df["日期"])
    return df

item_data = add_external_features(item_data)
X = item_data[["day_of_week", "是否节假日", "新增病例", "气温"]]

网站公告

今日签到

点亮在社区的每一天
去签到