Golang websocket结合一致性哈希算法构建高并发推送服务
1 场景介绍
web应用中,常有业务状态需要实时更新的场景。如一个较长的后台任务,从浏览器用户触发执行到执行完成可能需几十秒的时间,这时前端需隔几秒请求一次后台,查询任务执行进度。此种方式是长轮询的方式,是存在一定弊端的,增加了后台服务的负载,若并发操作量太大,后台压力会成倍激增。业界常采用http1.1的websocket扩展协议与浏览器建立长连接来实现实时业务状态更新。
2 实现方案
本文采用golang实现一个长连接服务,对外提供两个接口,一个是基于http的rest消息发送接口,一个是基于websocket的cient接入接口,如下图所示。
为使前端的接入更简单,从建立连接到用户关闭浏览器,中间前端无须发送消息来告知服务器client是否下线。我们将检测放在后台,后台采用定时心跳方式保持对client的监听,若心跳失败,则将该client剔除。如下图所示。
3 golang实现代码
comet服务内有两个模块,http server负责接收消息,comet server负责维护websocket client,每个client启用一个go routine对客户端保持心跳检测。
3.1 核心模块
package comet import ( "encoding/json" "log" "time" "golang.org/x/net/websocket" ) type HttpServer struct { wsServer *WsServer } type WsServer struct { Clients map[string][]*Client AddCli chan *Client DelCli chan *Client Message chan *Message } type Client struct { UserId string Timestamp int64 conn *websocket.Conn wsServer *WsServer } type Message struct { UserId string `json:"user_id"` Message string `json:"message"` } func NewWsServer() *WsServer { return &WsServer{ make(map[string][]*Client), make(chan *Client), make(chan *Client), make(chan *Message, 1000), } } func NewHttpServer(wsServer *WsServer) *HttpServer { return &HttpServer{wsServer} } func (httpServer *HttpServer) SendMessage(userId, message string) { log.Printf("message reveived, user_id: %s, message: %s", userId, message) httpServer.wsServer.Message <- &Message{userId, message} } func (wsServer *WsServer) SendMessage(userId, message string) { clients := wsServer.Clients[userId] if len(clients) > 0 { for _, c := range clients { c.conn.Write([]byte(message)) } log.Printf("message success sent to client, user_id: %s", userId) } else { log.Printf("client not found, user_id: %s", userId) } } func (wsServer *WsServer) addClient(c *Client) { clients := wsServer.Clients[c.UserId] wsServer.Clients[c.UserId] = append(clients, c) log.Printf("a client added, userId: %s, timestamp: %d", c.UserId, c.Timestamp) } func (wsServer *WsServer) delClient(c *Client) { clients := wsServer.Clients[c.UserId] if len(clients) > 0 { for i, client := range clients { if client.Timestamp == c.Timestamp { wsServer.Clients[c.UserId] = append(clients[:i], clients[i+1:]...) break } } } if 0 == len(clients) { delete(wsServer.Clients, c.UserId) } log.Printf("a client deleted, user_id: %s, timestamp: %d", c.UserId, c.Timestamp) } func (wsServer *WsServer) Start() { for { select { case msg := <-wsServer.Message: wsServer.SendMessage(msg.UserId, msg.Message) case c := <-wsServer.AddCli: wsServer.addClient(c) case c := <-wsServer.DelCli: wsServer.delClient(c) } } } func (c *Client) heartbeat() error { millis := time.Now().UnixNano() / 1000000 heartbeat := struct { Heartbeat int64 `json:"heartbeat"` }{millis} bytes, _ := json.Marshal(heartbeat) _, err := c.conn.Write(bytes) return err } func (c *Client) Listen() { for { err := c.heartbeat() if nil != err { log.Printf("client heartbeat error, user_id: %v, timestamp: %d, err: %s", c.UserId, c.Timestamp, err) c.wsServer.DelCli <- c return } time.Sleep(time.Second * 5) } }
3.2 完整代码
https://github.com/olzhy/comet
4 一致性哈希包装
考虑到单服务的同时在线人数支持是有限的,所以在其上层用一致性哈希算法包装。这样同一user_id建立连接会打到同一台后台服务器,给此user_id发送消息也会打到同样的服务器。这样后台部署多个comet服务形成一个集群即可支撑高并发消息推送场景。如下图所示,最外层nginx挂接公网域名,对外提供基于wss的消息接收接口及基于http的消息发送接口。中间采用haproxy对user_id参数作一致性哈希转发,对同一user_id的操作会打到同一台comet server。底层扩展为多台comet server即可构建一个高并发的消息推送服务。
2018.09.02
大连
有疑问加站长微信联系(非本文作者)