Go语言WebSocket实时通信实战
Go语言WebSocket实时通信实战引言WebSocket是一种在单个TCP连接上提供全双工通信的协议广泛应用于实时聊天、实时通知等场景。本文将深入探讨Go语言中WebSocket的实现方式和最佳实践。一、WebSocket基础1.1 WebSocket握手过程// WebSocket握手请求头示例 // GET /chat HTTP/1.1 // Host: example.com // Upgrade: websocket // Connection: Upgrade // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ // Sec-WebSocket-Version: 13 // WebSocket握手响应头示例 // HTTP/1.1 101 Switching Protocols // Upgrade: websocket // Connection: Upgrade // Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbKxOo1.2 WebSocket帧结构0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -------------------------------------------------------- |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len126/127) | | |1|2|3| |K| | | ------------------------- - - - - - - - - - - - - - - - | Extended payload length continued, if payload len 127 | - - - - - - - - - - - - - - - ------------------------------- | |Masking-key, if MASK set to 1 | -------------------------------------------------------------- | Masking-key (continued) | Payload Data | -------------------------------- - - - - - - - - - - - - - - - : Payload Data continued ... : - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | Payload Data (continued) | ---------------------------------------------------------------二、Go语言WebSocket实现2.1 使用gorilla/websocket库import ( github.com/gorilla/websocket ) var upgrader websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // 允许所有来源生产环境需要限制 return true }, } func WebSocketHandler(w http.ResponseWriter, r *http.Request) { // 升级HTTP连接为WebSocket连接 conn, err : upgrader.Upgrade(w, r, nil) if err ! nil { log.Println(Upgrade error:, err) return } defer conn.Close() // 循环读取消息 for { messageType, p, err : conn.ReadMessage() if err ! nil { log.Println(Read error:, err) break } // 处理消息 log.Printf(Received: %s, p) // 发送响应 err conn.WriteMessage(messageType, p) if err ! nil { log.Println(Write error:, err) break } } }2.2 消息类型// WebSocket消息类型常量 const ( TextMessage 1 // 文本消息 BinaryMessage 2 // 二进制消息 CloseMessage 8 // 关闭连接 PingMessage 9 // Ping消息 PongMessage 10 // Pong消息 )三、聊天室实现3.1 Hub-Client模型type Hub struct { clients map[*Client]bool broadcast chan []byte register chan *Client unregister chan *Client } func NewHub() *Hub { return Hub{ broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), clients: make(map[*Client]bool), } } func (h *Hub) Run() { for { select { case client : -h.register: h.clients[client] true case client : -h.unregister: if _, ok : h.clients[client]; ok { delete(h.clients, client) close(client.send) } case message : -h.broadcast: for client : range h.clients { select { case client.send - message: default: close(client.send) delete(h.clients, client) } } } } }3.2 Client结构type Client struct { hub *Hub conn *websocket.Conn send chan []byte } func (c *Client) readPump() { defer func() { c.hub.unregister - c c.conn.Close() }() c.conn.SetReadLimit(512) c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) for { _, message, err : c.conn.ReadMessage() if err ! nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf(error: %v, err) } break } // 广播消息 c.hub.broadcast - message } } func (c *Client) writePump() { ticker : time.NewTicker(60 * time.Second) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok : -c.send: c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if !ok { // Hub关闭了通道 c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err : c.conn.NextWriter(websocket.TextMessage) if err ! nil { return } w.Write(message) // 添加待发送消息队列中的所有消息 n : len(c.send) for i : 0; i n; i { w.Write([]byte(\n)) w.Write(-c.send) } if err : w.Close(); err ! nil { return } case -ticker.C: c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err : c.conn.WriteMessage(websocket.PingMessage, nil); err ! nil { return } } } }3.3 连接处理func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { conn, err : upgrader.Upgrade(w, r, nil) if err ! nil { log.Println(err) return } client : Client{hub: hub, conn: conn, send: make(chan []byte, 256)} client.hub.register - client // 启动读写goroutine go client.writePump() go client.readPump() } func main() { hub : NewHub() go hub.Run() http.HandleFunc(/ws, func(w http.ResponseWriter, r *http.Request) { serveWs(hub, w, r) }) log.Fatal(http.ListenAndServe(:8080, nil)) }四、房间分组功能4.1 带房间的Hubtype RoomHub struct { rooms map[string]*Room register chan *Client unregister chan *Client mu sync.RWMutex } type Room struct { name string clients map[*Client]bool } func NewRoomHub() *RoomHub { return RoomHub{ rooms: make(map[string]*Room), register: make(chan *Client), unregister: make(chan *Client), } } func (rh *RoomHub) GetRoom(name string) *Room { rh.mu.RLock() room, ok : rh.rooms[name] rh.mu.RUnlock() if ok { return room } // 创建新房间 rh.mu.Lock() defer rh.mu.Unlock() room, ok rh.rooms[name] if !ok { room Room{ name: name, clients: make(map[*Client]bool), } rh.rooms[name] room } return room }4.2 加入/离开房间func (rh *RoomHub) Run() { for { select { case client : -rh.register: room : rh.GetRoom(client.roomName) room.clients[client] true case client : -rh.unregister: room : rh.GetRoom(client.roomName) if _, ok : room.clients[client]; ok { delete(room.clients, client) close(client.send) } } } } func (r *Room) Broadcast(message []byte) { for client : range r.clients { select { case client.send - message: default: close(client.send) delete(r.clients, client) } } }五、消息协议设计5.1 JSON消息格式type Message struct { Type string json:type Payload json.RawMessage json:payload } type TextMessage struct { UserID string json:user_id Username string json:username Content string json:content Timestamp int64 json:timestamp } type JoinMessage struct { RoomName string json:room_name UserID string json:user_id Username string json:username } type LeaveMessage struct { RoomName string json:room_name UserID string json:user_id }5.2 消息处理器func handleMessage(client *Client, message []byte) { var msg Message if err : json.Unmarshal(message, msg); err ! nil { log.Println(Invalid message format) return } switch msg.Type { case join: handleJoin(client, msg.Payload) case leave: handleLeave(client, msg.Payload) case text: handleText(client, msg.Payload) default: log.Println(Unknown message type:, msg.Type) } } func handleText(client *Client, payload json.RawMessage) { var textMsg TextMessage if err : json.Unmarshal(payload, textMsg); err ! nil { return } textMsg.Timestamp time.Now().Unix() response, _ : json.Marshal(Message{ Type: text, Payload: payload, }) client.room.Broadcast(response) }六、心跳检测与断线重连6.1 服务端心跳func (c *Client) startHeartbeat() { ticker : time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case -ticker.C: err : c.conn.WriteMessage(websocket.PingMessage, nil) if err ! nil { log.Println(Heartbeat failed:, err) c.hub.unregister - c return } case -c.done: return } } }6.2 客户端重连// JavaScript客户端示例 class WebSocketClient { constructor(url) { this.url url; this.connect(); } connect() { this.ws new WebSocket(this.url); this.ws.onopen () { console.log(Connected); }; this.ws.onclose (event) { console.log(Disconnected, reconnecting...); setTimeout(() this.connect(), 5000); }; this.ws.onerror (error) { console.error(WebSocket error:, error); }; this.ws.onmessage (event) { this.handleMessage(event.data); }; } }七、性能优化7.1 消息批量发送func (c *Client) writePump() { defer c.conn.Close() var messages []byte for { select { case message, ok : -c.send: if !ok { return } if len(messages) 0 { messages message } else { messages append(messages, message...) } // 批量发送 if len(messages) 1024 { c.conn.WriteMessage(websocket.BinaryMessage, messages) messages nil } case -time.After(100 * time.Millisecond): if len(messages) 0 { c.conn.WriteMessage(websocket.BinaryMessage, messages) messages nil } } } }7.2 连接池管理type ConnectionPool struct { connections map[string]*websocket.Conn mu sync.RWMutex } func (p *ConnectionPool) Add(id string, conn *websocket.Conn) { p.mu.Lock() defer p.mu.Unlock() p.connections[id] conn } func (p *ConnectionPool) Get(id string) (*websocket.Conn, bool) { p.mu.RLock() defer p.mu.RUnlock() conn, ok : p.connections[id] return conn, ok }八、安全考虑8.1 认证与授权func AuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // 验证用户认证 token : r.Header.Get(Authorization) if token { http.Error(w, Unauthorized, http.StatusUnauthorized) return } // 验证Token claims, err : validateToken(token) if err ! nil { http.Error(w, Invalid token, http.StatusUnauthorized) return } // 将用户信息存入Context ctx : context.WithValue(r.Context(), user_id, claims.UserID) next.ServeHTTP(w, r.WithContext(ctx)) }) }8.2 消息大小限制func WebSocketHandler(w http.ResponseWriter, r *http.Request) { conn, err : upgrader.Upgrade(w, r, nil) if err ! nil { return } defer conn.Close() // 设置最大消息大小 conn.SetReadLimit(1024 * 1024) // 1MB }九、实战案例实时通知系统9.1 服务器端func SendNotification(userID string, message string) error { conn, ok : connectionPool.Get(userID) if !ok { return fmt.Errorf(user not connected) } notification, _ : json.Marshal(map[string]interface{}{ type: notification, message: message, time: time.Now().Unix(), }) return conn.WriteMessage(websocket.TextMessage, notification) } func OrderCreated(orderID string, userID string) { message : fmt.Sprintf(订单 %s 已创建, orderID) go SendNotification(userID, message) }9.2 客户端集成// Go客户端示例 func NewWebSocketClient(url string) (*websocket.Conn, error) { conn, _, err : websocket.DefaultDialer.Dial(url, nil) if err ! nil { return nil, err } go func() { for { _, message, err : conn.ReadMessage() if err ! nil { log.Println(Read error:, err) break } handleNotification(message) } }() return conn, nil }结论WebSocket为实时Web应用提供了高效的双向通信能力。通过Hub-Client模型、房间分组、心跳检测等机制可以构建出稳定可靠的实时通信系统。在实际项目中需要关注性能优化和安全性问题确保系统能够支持大规模并发连接并保护用户数据安全。