处理 NATS 订阅的消息
在 WebSocket 的管理模块中,我们之前已经处理了一些消息。这些消息通过 NATS 订阅过来,我们需要对这些消息进行进一步的处理。一旦消息到达,我们需要执行相应的操作,并将结果发送回去,包括之前的操作。
理论上,所有消息都应该能够到达这里进行处理。目前,我们还没有对这部分进行处理,接下来需要完善这部分逻辑。
3. 解析消息并处理
当收到消息后,我们需要解析消息内容。解析完成后,我们需要根据消息的类型(type
)进行不同的处理。
特殊类型处理:
如果消息类型是“三审”(假设的类型),我们需要进行特殊处理。因为这种类型的消息可能存储在 WebSocket 的连接中,而不是直接推送给客户端。
如果消息类型是“replace”或“response”,我们需要将消息类型改为“response”,以便正确地将消息发送回客户端。
普通类型处理:
如果消息类型是“pose”,我们需要将消息放入一个专门的通道(Channel)中进行处理。这样可以提高系统的承载能力,避免消息积压。
4. 消息推送逻辑
在处理完消息后,我们需要将消息推送给客户端。具体步骤如下:
获取客户端 ID:
从消息中获取当前客户端的 ID。如果客户端不存在,我们需要记录日志并提示客户端已下线。
编码消息:
对消息进行编码处理,确保消息格式正确。
发送消息:
将编码后的消息发送给客户端。如果消息类型是“pose”,我们需要循环处理所有相关客户端,并将消息推送给每个客户端。
5. 代码实现
以下是优化后的代码实现:
package websocket
import (
"encoding/json"
"fmt"
"log"
"sync"
"github.com/nats-io/nats.go"
"github.com/gorilla/websocket"
)
type WebSocketManager struct {
sync.RWMutex
clients map[*websocket.Conn]string
messageCh chan *Message
pushCh chan *Message
natsConn *nats.Conn
}
type Message struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
ClientID string `json:"clientID"`
}
func NewWebSocketManager(natsConn *nats.Conn) *WebSocketManager {
return &WebSocketManager{
clients: make(map[*websocket.Conn]string),
messageCh: make(chan *Message, 1024),
pushCh: make(chan *Message, 1024),
natsConn: natsConn,
}
}
func (wm *WebSocketManager) handleMessage(msg *Message) {
log.Printf("Handling message: %+v", msg)
// 获取客户端连接
clientID := msg.ClientID
clientConn, exists := wm.clients[clientID]
if !exists {
log.Printf("Client not found: %s", clientID)
return
}
// 根据消息类型处理
switch msg.Type {
case "response":
// 处理响应消息
wm.sendToClient(clientConn, msg)
case "pose":
// 处理推送消息
wm.pushCh <- msg
default:
log.Printf("Unknown message type: %s", msg.Type)
}
}
func (wm *WebSocketManager) sendToClient(clientConn *websocket.Conn, msg *Message) {
encodedMsg, err := json.Marshal(msg)
if err != nil {
log.Printf("Failed to encode message: %v", err)
return
}
err = clientConn.WriteMessage(websocket.TextMessage, encodedMsg)
if err != nil {
log.Printf("Failed to send message to client: %v", err)
}
}
func (wm *WebSocketManager) processPushMessages() {
for msg := range wm.pushCh {
for clientConn := range wm.clients {
wm.sendToClient(clientConn, msg)
}
}
}
func (wm *WebSocketManager) start() {
go wm.processPushMessages()
// NATS 订阅消息
nc := wm.natsConn
nc.Subscribe("ws.messages", func(msg *nats.Msg) {
var wsMsg Message
err := json.Unmarshal(msg.Data, &wsMsg)
if err != nil {
log.Printf("Failed to unmarshal NATS message: %v", err)
return
}
wm.handleMessage(&wsMsg)
})
}