【Easylive】视频在线人数统计系统实现详解 & WebSocket 及其在在线人数统计中的应用

发布于:2025-04-03 ⋅ 阅读:(17) ⋅ 点赞:(0)

【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版

视频在线人数统计系统实现详解

1. 系统架构概述

您实现的是一个基于Redis的视频在线人数统计系统,主要包含以下组件:

  1. 心跳上报接口:客户端定期调用以维持在线状态
  2. Redis存储结构:使用两种键存储在线信息
  3. 过期监听机制:通过Redis的键过期事件自动减少在线人数
  4. 计数维护逻辑:确保在线人数的准确性

2. 核心实现细节

2.1 数据结构设计

系统使用了两种Redis键:

  1. 用户播放键 (userPlayOnlineKey)
    • 格式:video:play:user:{fileId}:{deviceId}
    • 作用:标记特定设备是否在线
    • 过期时间:8秒

  2. 在线计数键 (playOnlineCountKey)
    • 格式:video:play:online:{fileId}
    • 作用:存储当前视频的在线人数
    • 过期时间:10秒

2.2 心跳上报流程 (reportVideoPlayOnline)

public Integer reportVideoPlayOnline(String fileId, String deviceId) {
    // 构造Redis键
    String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);
    String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);
    
    // 新用户上线处理
    if (!redisUtils.keyExists(userPlayOnlineKey)) {
        // 设置用户键(8秒过期)
        redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);
        // 增加在线计数(10秒过期)
        return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();
    }
    
    // 已有用户续期处理
    redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);
    redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);
    
    // 返回当前在线人数
    Integer count = (Integer) redisUtils.get(playOnlineCountKey);
    return count == null ? 1 : count;
}

工作流程

  1. 客户端每5-7秒调用一次/reportVideoPlayOnline接口
  2. 服务端检查用户键是否存在:
    • 不存在:创建用户键(8秒过期),增加计数键(10秒过期)
    • 存在:续期两个键的过期时间
  3. 返回当前在线人数

2.3 过期监听机制 (RedisKeyExpirationListener)

@Override
public void onMessage(Message message, byte[] pattern) {
    String key = message.toString();
    // 只处理用户播放键的过期事件
    if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {
        return;
    }
    
    // 从key中提取fileId
    Integer userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();
    String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);
    
    // 减少对应视频的在线计数
    redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));
}

工作流程

  1. Redis在用户键(8秒)过期时发送通知
  2. 监听器收到通知后:
    • 验证是否为用户播放键
    • 从键名中提取视频ID(fileId)
    • 减少对应视频的在线计数

2.4 计数递减逻辑 (decrementPlayOnlineCount)

public void decrementPlayOnlineCount(String key) {
    redisUtils.decrement(key);
}

作用:简单地减少指定键的计数值

3. 关键设计原理

3.1 双键设计的意义

  1. 用户播放键
    • 作为"心跳"存在的证据
    • 过期时间(8秒)短于计数键(10秒),确保先检测到用户离线

  2. 在线计数键
    • 集中存储当前在线人数
    • 稍长的过期时间防止误删

3.2 时间参数设计

8秒用户键过期:假设客户端每5-7秒上报一次,8秒确保能检测到中断
10秒计数键过期:比用户键多2秒,防止竞态条件
客户端上报频率:建议5-7秒一次,平衡准确性和服务器压力

3.3 容错机制

  1. 计数键续期:每次心跳都会延长计数键的过期时间
  2. 空值处理:当计数键不存在时返回1作为默认值
  3. 精确递减:只在用户键过期时才减少计数,避免重复递减

4. 工作流程图

Client Server Redis Listener 心跳上报(fileId, deviceId) 检查user:{fileId}:{deviceId}存在? 不存在 设置user键(8s) 增加online计数(10s) 存在 续期user键(8s) 续期online键(10s) alt [新用户] [已有用户] 返回当前在线人数 user键过期通知 减少对应online计数 loop [过期监听] Client Server Redis Listener

5. 系统优势

  1. 实时性高:秒级检测用户离线
  2. 性能优异:完全基于Redis内存操作
  3. 扩展性强:轻松支持大量并发用户
  4. 准确可靠:双重验证机制防止误计数
  5. 资源节约:自动清理不活跃用户的记录

6. 潜在优化方向

  1. 批量上报:允许客户端一次上报多个视频的状态
  2. 分布式锁:在极高并发下保证计数准确
  3. 异常处理:增加Redis操作失败的重试机制
  4. 监控指标:添加在线人数变化的监控和报警
  5. 动态过期:根据系统负载动态调整过期时间

这个实现很好地平衡了准确性、实时性和性能要求,是一个非常典型的在线人数统计解决方案。

WebSocket 及其在在线人数统计中的应用

WebSocket 基础介绍

什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

与传统 HTTP 轮询的区别

特性 WebSocket HTTP 轮询
连接方式 持久化连接 每次请求新建连接
通信方向 全双工 半双工
实时性 毫秒级 依赖轮询间隔(通常秒级)
服务器推送 支持 不支持
资源消耗 连接初期开销大,后期开销小 每次请求都有完整HTTP开销
适用场景 高实时性应用 实时性要求不高的应用

基于 WebSocket 的在线人数统计实现

系统架构设计

客户端A ──┐
          ├─── WebSocket 服务器 ─── Redis 集群
客户端B ──┘           │
                      │
                数据库(持久化)

核心实现代码

1. WebSocket 服务端实现 (Spring Boot)
@ServerEndpoint("/online/{videoId}")
@Component
public class VideoOnlineEndpoint {
    
    private static ConcurrentMap<String, Set<Session>> videoSessions = new ConcurrentHashMap<>();
    private static RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        VideoOnlineEndpoint.redisTemplate = redisTemplate;
    }
    
    @OnOpen
    public void onOpen(Session session, @PathParam("videoId") String videoId) {
        // 添加会话到视频组
        videoSessions.computeIfAbsent(videoId, k -> ConcurrentHashMap.newKeySet()).add(session);
        
        // 更新Redis计数
        String redisKey = "video:online:" + videoId;
        redisTemplate.opsForValue().increment(redisKey);
        redisTemplate.expire(redisKey, 10, TimeUnit.MINUTES);
        
        // 广播更新后的在线人数
        broadcastOnlineCount(videoId);
    }
    
    @OnClose
    public void onClose(Session session, @PathParam("videoId") String videoId) {
        // 从视频组移除会话
        Set<Session> sessions = videoSessions.get(videoId);
        if (sessions != null) {
            sessions.remove(session);
            
            // 更新Redis计数
            String redisKey = "video:online:" + videoId;
            redisTemplate.opsForValue().decrement(redisKey);
            
            // 广播更新后的在线人数
            broadcastOnlineCount(videoId);
        }
    }
    
    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }
    
    private void broadcastOnlineCount(String videoId) {
        String count = redisTemplate.opsForValue().get("video:online:" + videoId);
        String message = "ONLINE_COUNT:" + (count != null ? count : "0");
        
        Set<Session> sessions = videoSessions.get(videoId);
        if (sessions != null) {
            sessions.forEach(session -> {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
2. 客户端实现 (JavaScript)
const videoId = '12345'; // 当前观看的视频ID
const socket = new WebSocket(`wss://yourdomain.com/online/${videoId}`);

// 连接建立时
socket.onopen = function(e) {
    console.log("WebSocket连接已建立");
};

// 接收消息
socket.onmessage = function(event) {
    if(event.data.startsWith("ONLINE_COUNT:")) {
        const count = event.data.split(":")[1];
        updateOnlineCountDisplay(count);
    }
};

// 连接关闭时
socket.onclose = function(event) {
    if (event.wasClean) {
        console.log(`连接正常关闭,code=${event.code} reason=${event.reason}`);
    } else {
        console.log('连接异常断开');
        // 尝试重新连接
        setTimeout(() => connectWebSocket(), 5000);
    }
};

// 错误处理
socket.onerror = function(error) {
    console.log(`WebSocket错误: ${error.message}`);
};

function updateOnlineCountDisplay(count) {
    document.getElementById('online-count').innerText = count;
}
3. 心跳机制实现
// 客户端心跳
setInterval(() => {
    if(socket.readyState === WebSocket.OPEN) {
        socket.send("HEARTBEAT");
    }
}, 30000); // 30秒发送一次心跳

// 服务端心跳检测 (Java)
@ServerEndpoint配置中添加:
@OnMessage
public void onMessage(Session session, String message) {
    if("HEARTBEAT".equals(message)) {
        session.getAsyncRemote().sendText("HEARTBEAT_ACK");
    }
}

方案优势分析

  1. 实时性极佳
    • 在线人数变化可实时推送到所有客户端
    • 无轮询延迟,通常达到毫秒级更新

  2. 精确计数
    • 基于实际连接状态计数
    • 避免Redis过期时间的估算误差

  3. 扩展功能容易
    • 可轻松扩展实现弹幕、实时评论等功能
    • 支持复杂的互动场景

  4. 减少无效请求
    • 相比HTTP轮询减少90%以上的请求量
    • 显著降低服务器压力

潜在挑战与解决方案

1. 连接保持问题

问题:移动网络不稳定导致频繁断开

解决方案
• 实现自动重连机制
• 使用心跳包检测连接状态
• 设置合理的超时时间

2. 大规模并发问题

问题:单视频热点导致连接数激增

解决方案
• 使用WebSocket集群
• 引入负载均衡(如Nginx)
• 实现连接分片策略

3. 状态同步问题

问题:集群环境下状态同步

解决方案
• 使用Redis Pub/Sub同步各节点状态
• 采用一致性哈希分配连接
• 实现分布式会话管理

性能优化建议

  1. 协议优化
    • 启用WebSocket压缩扩展
    • 使用二进制协议替代文本协议

  2. 资源控制
    • 实现连接数限制
    • 设置单个IP连接限制

  3. 监控体系
    • 建立连接数监控
    • 实现异常连接报警

  4. 优雅降级
    • WebSocket不可用时自动降级为长轮询
    • 提供兼容性方案

与传统方案的对比

指标 WebSocket方案 Redis键过期方案
实时性 毫秒级 秒级(依赖过期时间)
精确度 100%准确 有1-2秒延迟
实现复杂度 较高 较低
服务器负载 连接初期高,维持期低 持续中等负载
扩展性 容易扩展其他实时功能 仅限于计数
客户端兼容性 需现代浏览器支持 所有环境兼容
移动端表现 可能因网络切换断开 不受影响

适用场景建议

推荐使用WebSocket方案当:
• 需要实时显示精确在线人数
• 已经使用或计划使用WebSocket实现其他功能(如弹幕、聊天)
• 客户端环境可控(如自己的APP或现代浏览器)
• 有足够资源维护WebSocket基础设施

推荐保持Redis方案当:
• 实时性要求不是极高(秒级可接受)
• 需要支持老旧客户端
• 系统规模较小,希望简单维护
• 主要关注计数而非实时交互

混合方案设计

结合两种方案优势的折中实现:

// WebSocket连接时更新精确计数
@OnOpen
public void onOpen(Session session, @PathParam("videoId") String videoId) {
    // 更新内存中的精确计数
    incrementLocalCount(videoId);
    
    // 每10秒同步到Redis一次
    if(needSyncToRedis(videoId)) {
        redisTemplate.opsForValue().set(
            "video:online:" + videoId, 
            getLocalCount(videoId).toString()
        );
    }
}

// 对外提供查询接口
@GetMapping("/online/{videoId}")
public int getOnlineCount(@PathVariable String videoId) {
    // 优先返回本地精确计数
    Integer localCount = getLocalCount(videoId);
    if(localCount != null) {
        return localCount;
    }
    
    // 回退到Redis计数
    String count = redisTemplate.opsForValue().get("video:online:" + videoId);
    return count != null ? Integer.parseInt(count) : 0;
}

这种混合方案:
• 对WebSocket客户端提供精确计数
• 对非WebSocket客户端提供近似的Redis计数
• 平衡了精确性和兼容性

查看在线观看人数

通过轮询上报心跳,在服务端记录设备有没有不停地上报心跳,如果没有上报心跳,通过 Redis 的 key 的失效,会有一个通知没有再上报心跳,就会把在线人数 -1。

Redis在线人数统计实现详解

以下是带有详细注释的代码实现,解释了基于Redis的在线人数统计系统的工作原理:

/**
 * 客户端上报心跳接口
 * @param fileId 视频文件ID
 * @param deviceId 设备唯一标识
 * @return 当前在线人数
 */
@RequestMapping("/reportVideoPlayOnline")
public ResponseVO reportVideoPlayOnline(@NotEmpty String fileId, @NotEmpty String deviceId){
    // 调用Redis组件处理心跳上报,并返回成功响应
    return getSuccessResponseVO(redisComponent.reportVideoPlayOnline(fileId, deviceId));
}

/**
 * 处理视频在线人数统计的核心方法
 * @param fileId 视频文件ID
 * @param deviceId 设备唯一标识
 * @return 当前在线人数
 */
public Integer reportVideoPlayOnline(String fileId, String deviceId){
    // 构建Redis键:用户级别的键,用于标记特定设备是否在线
    String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);
    // 构建Redis键:视频级别的键,用于存储当前视频的总在线人数
    String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);

    // 检查是否是新的观看用户(该设备首次上报或已过期)
    if (!redisUtils.keyExists(userPlayOnlineKey)) {
        // 设置用户键,8秒后过期(如果8秒内没有下次心跳,则认为离线)
        redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);
        // 增加视频的总在线人数计数,并设置10秒过期时间
        return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();
    }
    
    // 以下是已有用户的处理逻辑:
    
    // 续期视频的总在线人数键(10秒)
    redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);
    // 续期用户级别的键(8秒)
    redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);
    
    // 获取当前在线人数(防止并发问题导致的计数不准确)
    Integer count = (Integer) redisUtils.get(playOnlineCountKey);
    // 如果获取不到计数(极端情况),默认返回1
    return count == null ? 1 : count;
}

/**
 * 减少在线人数计数
 * @param key 需要减少计数的Redis键
 */
public void decrementPlayOnlineCount(String key) {
    // 对指定键的值进行原子递减
    redisUtils.decrement(key);
}

/**
 * Redis键过期监听器,用于处理用户离线情况
 */
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    @Resource
    private RedisComponent redisComponent;

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    /**
     * 处理Redis键过期事件
     * @param message 过期消息
     * @param pattern 模式
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取过期的键名
        String key = message.toString();
        
        // 只处理用户级别的在线状态键过期事件
        if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {
            return;
        }
        
        // 从键名中提取视频ID
        // 计算用户键前缀的长度
        Integer userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) 
            + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();
        // 截取视频ID(假设ID长度为20)
        String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);
        
        // 减少对应视频的在线人数计数
        redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));
    }
}

系统工作流程详解

  1. 心跳上报机制
    • 客户端每隔5-7秒调用/reportVideoPlayOnline接口上报心跳
    • 服务端通过Redis记录设备最后一次活跃时间

  2. 双键设计原理
    用户键(userPlayOnlineKey)
    ◦ 格式:video:play:user:{fileId}:{deviceId}
    ◦ 作用:标记特定设备是否在线
    ◦ 过期时间:8秒(如果8秒内没有心跳则认为离线)
    计数键(playOnlineCountKey)
    ◦ 格式:video:play:online:{fileId}
    ◦ 作用:存储当前视频的总在线人数
    ◦ 过期时间:10秒(比用户键稍长,防止竞态条件)

  3. 新用户上线处理

    if (!redisUtils.keyExists(userPlayOnlineKey)) {
        redisUtils.setex(userPlayOnlineKey, fileId, 8);
        return redisUtils.incrementex(playOnlineCountKey, 10);
    }
    

    • 当用户键不存在时,创建用户键并增加总计数

  4. 已有用户续期处理

    redisUtils.expire(playOnlineCountKey, 10);
    redisUtils.expire(userPlayOnlineKey, 8);
    

    • 续期两个键的过期时间,保持活跃状态

  5. 离线检测机制
    • 当用户键8秒过期时,触发RedisKeyExpirationListener
    • 监听器从键名提取videoId,减少对应视频的在线计数

  6. 容错处理

    Integer count = (Integer) redisUtils.get(playOnlineCountKey);
    return count == null ? 1 : count;
    

    • 防止极端情况下计数键丢失,返回默认值1

设计优势分析

  1. 精确计数:基于实际心跳而非估算,结果准确
  2. 自动清理:通过Redis过期机制自动清理不活跃用户
  3. 低延迟:键过期通知机制实现秒级离线检测
  4. 高性能:完全基于内存操作,无数据库IO
  5. 可扩展:Redis集群支持横向扩展

关键参数说明

参数 说明
用户键过期时间 8秒 客户端应每5-7秒上报一次心跳
计数键过期时间 10秒 比用户键稍长,防止竞态条件
视频ID长度 20 需与业务系统保持一致

这个实现方案在保证准确性的同时,具有优秀的性能和可扩展性,非常适合中小规模的实时在线人数统计场景。

自看

通过Redis计数器来给视频的在线观看人数进行增加和减少,也就是通过心跳来不停上报当前用户是否正在观看,当浏览器关闭时,该用户就不会再持续上报心跳,此时该用户的Redis Key则会失效,Redis Key失效的时候会发送消息通知,根据这个消息通知得知失效,再去减少在线观看人数。

Netty与视频在线人数统计的结合

Netty基础介绍

Netty是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端程序。它基于Java NIO(Non-blocking I/O)构建,主要特点包括:

  1. 高性能:支持百万级并发连接
  2. 低延迟:非阻塞I/O模型减少等待时间
  3. 高扩展性:模块化设计,可灵活扩展
  4. 协议支持:内置HTTP、WebSocket、TCP/UDP等协议支持

为什么考虑用Netty实现在线人数统计?

当前基于HTTP轮询+Redis的实现存在以下可优化点:
HTTP开销大:每次轮询都需要完整的HTTP请求/响应头
实时性有限:依赖轮询间隔(通常秒级)
服务器压力:高并发时大量无效轮询请求

Netty可以解决这些问题,提供真正的实时通信能力

基于Netty的在线人数统计设计

系统架构

客户端App/Web ──▶ Netty服务器集群 ──▶ Redis集群
    │
    │ (WebSocket/TCP长连接)
    ▼
用户行为数据(心跳、上下线)

核心组件实现

1. Netty服务器初始化
public class VideoOnlineServer {
    public void start(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     // 心跳检测(15秒无读写则关闭连接)
                     pipeline.addLast("idleStateHandler", 
                         new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));
                     // 自定义协议解码/编码
                     pipeline.addLast("decoder", new OnlineMessageDecoder());
                     pipeline.addLast("encoder", new OnlineMessageEncoder());
                     // 业务逻辑处理器
                     pipeline.addLast("handler", new OnlineMessageHandler());
                 }
             });
            
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
2. 消息处理器实现
public class OnlineMessageHandler extends SimpleChannelInboundHandler<OnlineMessage> {
    // 视频ID到Channel组的映射
    private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, OnlineMessage msg) {
        switch (msg.getType()) {
            case CONNECT: // 连接初始化
                handleConnect(ctx, msg.getVideoId(), msg.getDeviceId());
                break;
            case HEARTBEAT: // 心跳
                handleHeartbeat(ctx, msg.getVideoId(), msg.getDeviceId());
                break;
            case DISCONNECT: // 主动断开
                handleDisconnect(ctx, msg.getVideoId(), msg.getDeviceId());
                break;
        }
    }
    
    private void handleConnect(ChannelHandlerContext ctx, String videoId, String deviceId) {
        // 加入视频频道组
        ChannelGroup group = videoGroups.computeIfAbsent(videoId, 
            k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        group.add(ctx.channel());
        
        // 更新Redis计数
        long count = RedisUtils.increment("video:online:" + videoId);
        
        // 广播新在线人数
        broadcastCount(videoId, count);
    }
    
    private void handleHeartbeat(ChannelHandlerContext ctx, String videoId, String deviceId) {
        // 更新设备最后活跃时间(Redis)
        RedisUtils.setex("device:active:" + videoId + ":" + deviceId, 
            "1", 15); // 15秒过期
        
        // 可选择性返回当前人数
        ctx.writeAndFlush(new OnlineMessage(HEARTBEAT_ACK, getOnlineCount(videoId)));
    }
}
3. 客户端断连处理
@Override
public void channelInactive(ChannelHandlerContext ctx) {
    // 从所有视频组中移除该Channel
    videoGroups.values().forEach(group -> group.remove(ctx.channel()));
    
    // 更新Redis计数(需要维护设备到视频ID的映射)
    String deviceId = getDeviceId(ctx.channel());
    String videoId = getVideoId(ctx.channel());
    long count = RedisUtils.decrement("video:online:" + videoId);
    
    // 广播新人数
    broadcastCount(videoId, count);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
    // 处理空闲连接
    if (evt instanceof IdleStateEvent) {
        ctx.close(); // 关闭超时未心跳的连接
    }
}

与传统方案的对比

特性 Netty实现方案 HTTP轮询+Redis方案
实时性 毫秒级 依赖轮询间隔(通常秒级)
协议开销 仅心跳数据(几十字节) 完整HTTP头(通常几百字节)
服务器压力 长连接维护,无重复握手 每次轮询都新建连接
并发能力 单机支持10万+连接 受限于HTTP服务器性能
实现复杂度 较高 简单
移动网络适应性 需处理频繁重连 天然适应

关键设计考虑

  1. 连接管理
    • 使用ChannelGroup管理同视频的用户连接
    IdleStateHandler自动检测空闲连接

  2. 状态同步
    • Redis存储全局计数,避免Netty单点问题
    • 定期同步内存与Redis的数据

  3. 消息协议设计

    message OnlineMessage {
      enum Type {
        CONNECT = 0;
        HEARTBEAT = 1;
        DISCONNECT = 2;
      }
      Type type = 1;
      string videoId = 2;
      string deviceId = 3;
      int64 count = 4; // 用于服务端返回当前人数
    }
    
  4. 弹性设计
    • 客户端实现自动重连
    • 服务端优雅降级机制

性能优化技巧

  1. 对象池化:重用消息对象减少GC
  2. 零拷贝:使用CompositeByteBuf合并小数据包
  3. 事件循环:业务逻辑放入单独线程池
  4. 批量操作:合并Redis操作减少网络往返

适用场景建议

推荐使用Netty当:
• 需要真正的实时互动(如直播弹幕)
• 预期有超高并发(万级同时在线)
• 已经需要维护长连接(如游戏、IM)

保持当前方案当:
• 实时性要求不高
• 开发资源有限
• 客户端环境复杂(如需要支持老旧浏览器)

Netty方案虽然实现复杂度较高,但能为视频平台提供更实时、更高效的在线人数统计能力,并为未来扩展实时互动功能奠定基础。

Netty与WebSocket的关系及在实时统计中的应用

Netty和WebSocket是不同层次的技术,但它们可以紧密结合来构建高性能的实时通信系统。以下是它们的核心关系和在视频在线人数统计中的应用分析:

1. Netty与WebSocket的基础关系

维度 Netty WebSocket 二者关系
定位 网络应用框架 通信协议 Netty是实现WebSocket协议的底层框架之一
层级 传输层/应用层框架 应用层协议 Netty提供了对WebSocket协议的支持
功能 处理TCP/UDP连接、编解码、并发等 提供全双工通信能力 Netty帮助高效实现WebSocket的通信能力
典型使用 可作为WebSocket服务器的基础实现 运行在Netty等框架之上 开发者通过Netty API构建WebSocket服务

2. 技术栈组合原理

[WebSocket客户端] ←WebSocket协议→ [Netty WebSocket服务端] ←TCP→ [操作系统网络栈]
  1. 协议支持
    • Netty内置WebSocketServerProtocolHandler等组件
    • 自动处理WebSocket握手、帧编解码等底层细节

  2. 性能优势
    • Netty的Reactor线程模型优化WebSocket连接管理
    • 零拷贝技术提升WebSocket数据传输效率

  3. 扩展能力
    • 在WebSocket之上可添加自定义协议
    • 方便集成SSL/TLS等安全层

3. 在视频在线统计中的联合实现

基于Netty的WebSocket服务端示例

public class VideoWebSocketServer {
    public void start(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     
                     // HTTP编解码器(用于WebSocket握手)
                     pipeline.addLast(new HttpServerCodec());
                     // 聚合HTTP请求
                     pipeline.addLast(new HttpObjectAggregator(65536));
                     // WebSocket协议处理器
                     pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                     // 自定义业务处理器
                     pipeline.addLast(new OnlineStatsHandler());
                 }
             });
            
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

在线统计业务处理器

public class OnlineStatsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
    // 视频频道组映射
    private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        // 解析JSON消息:{"action":"heartbeat","videoId":"123"}
        JsonObject json = parseJson(msg.text());
        
        String videoId = json.getString("videoId");
        ChannelGroup group = videoGroups.computeIfAbsent(videoId, 
            k -> new DefaultChannelGroup(ctx.executor()));
            
        switch (json.getString("action")) {
            case "join":
                group.add(ctx.channel());
                broadcastCount(videoId, group.size());
                break;
            case "heartbeat":
                // 更新Redis活跃记录
                redis.incr("active:" + videoId + ":" + ctx.channel().id());
                break;
        }
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // 从所有组中移除并更新计数
        videoGroups.values().forEach(group -> {
            if (group.remove(ctx.channel())) {
                broadcastCount(getVideoId(ctx), group.size());
            }
        });
    }
}

4. 与传统HTTP轮询方案的对比

特性 Netty+WebSocket HTTP轮询
连接方式 1个持久连接 频繁新建连接
头部开销 握手后无冗余头 每次请求都带完整HTTP头
实时性 毫秒级 依赖轮询间隔(通常秒级)
服务器压力 连接数×心跳频率 请求数×轮询频率
移动网络适应 需处理网络切换 天然适应
实现复杂度 较高 简单

5. 典型消息流程

  1. 连接建立

    客户端 → HTTP Upgrade请求 → Netty(完成WebSocket握手) → 建立持久连接
    
  2. 心跳维持

    // 客户端每10秒发送
    {"action":"heartbeat","videoId":"123","timestamp":1620000000}
    
    // 服务端响应
    {"type":"ack","online":1524}
    
  3. 人数推送

    // 服务端主动推送
    {"type":"stats","videoId":"123","online":1525,"change":1}
    

6. 性能优化关键点

  1. 连接管理
    • 使用ChannelGroup管理视频房间的订阅者
    • 配置合理的IdleStateHandler检测死连接

  2. 序列化优化

    // 使用二进制协议代替JSON
    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());
    
  3. 集群扩展

    // 使用Redis Pub/Sub同步各节点状态
    redis.subscribe("video:123", (channel, message) -> {
        broadcastToLocalClients(message);
    });
    
  4. 监控指标
    • 跟踪每个视频频道的连接数
    • 监控消息吞吐量和延迟

Netty与WebSocket的结合为实时统计提供了高并发、低延迟的解决方案,特别适合需要精确到毫秒级的在线人数统计场景,同时为未来扩展实时弹幕、即时消息等功能奠定了基础。


网站公告

今日签到

点亮在社区的每一天
去签到