实现代码如下:
import (
"errors"
"github.com/gorilla/websocket"
"sync"
)
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn=&Connection{
wsConn:wsConn,
inChan:make(chan []byte,1000),
outChan:make(chan []byte,1000),
closeChan:make(chan byte,1),
}
go conn.readLoop()
go conn.writeLoop()
return
}
func (conn *Connection) ReadMessage()(data []byte,err error){
select {
case data=<-conn.inChan:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
func (conn *Connection) WriteMessage(data []byte)(err error){
select {
case conn.outChan<-data:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
func (conn *Connection) Close(){
//线程安全的Close,可重入
conn.wsConn.Close()
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed=true
}
conn.mutex.Unlock()
}
func (conn *Connection) readLoop(){
var(
data []byte
err error
)
for{
if _,data,err=conn.wsConn.ReadMessage();err !=nil{
goto ERR
}
select {
case conn.inChan<-data:
case <-conn.closeChan:
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop(){
var (
data []byte
err error
)
for{
select {
case data=<-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err=conn.wsConn.WriteMessage(websocket.TextMessage,data);err!=nil{
goto ERR
}
}
ERR:
conn.Close()
}
首先定义一个结构体
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
wsConn websocket的长链接的实体
inChan 读数据的channel
outChan 写数据的channel
closeChan 链接关闭的channel
mutex 互斥锁
isClosed 链接关闭标识符
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn=&Connection{
wsConn:wsConn,
inChan:make(chan []byte,1000),
outChan:make(chan []byte,1000),
closeChan:make(chan byte,1),
}
go conn.readLoop()
go conn.writeLoop()
return
}
初始化链接
readLoop 和writeLoop 循环从websocket中读取数据和写入数据
func (conn *Connection) ReadMessage()(data []byte,err error){
select {
case data=<-conn.inChan:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
ReadMessage() 从inChan中读取数据
func (conn *Connection) WriteMessage(data []byte)(err error){
select {
case conn.outChan<-data:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
WriteMessage(data []byte) 写入数据传递给outChan ,writeLoop 监听outChan并写入数据
func (conn *Connection) Close(){
//线程安全的Close,可重入
conn.wsConn.Close()
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed=true
}
conn.mutex.Unlock()
}
mutex锁住关闭操作 ,避免重复循环关闭链接
关闭链接时,传递closeChan ,同时关闭readLoop 和writeLoop
func (conn *Connection) readLoop(){
var(
data []byte
err error
)
for{
if _,data,err=conn.wsConn.ReadMessage();err !=nil{
goto ERR
}
select {
case conn.inChan<-data:
case <-conn.closeChan:
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop(){
var (
data []byte
err error
)
for{
select {
case data=<-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err=conn.wsConn.WriteMessage(websocket.TextMessage,data);err!=nil{
goto ERR
}
}
ERR:
conn.Close()
}
通过outChan和inChan 传递信息,保证线程的安全。
有疑问加站长微信联系(非本文作者)