缘起
最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
案例需求(聊天服务器)
- 用户可以连接到服务器。
- 用户可以设定自己的用户名。
- 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。
目标
- 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
- 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
- 测试多个客户端的连入, 收发和断开, 并诊断服务端日志
设计
- IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
- IMsgDecoder: 定义消息解码器及其实现
- IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
- tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
- IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
- tChatServer: 实现聊天服务器IChatServer
单元测试
ChatServer_test.go
package chat_server
import (
"fmt"
cs "learning/gooop/chat_server"
"strings"
"testing"
"time"
)
func Test_ChatServer(t *testing.T) {
fnAssertTrue := func(b bool, msg string) {
if !b {
t.Fatal(msg)
}
}
port := 3333
server := cs.NewChatServer()
err := server.Open(port)
if err != nil {
t.Fatal(err)
}
clientCount := 3
address := fmt.Sprintf("localhost:%v", port)
for i := 0;i < clientCount;i++ {
err, client := cs.DialChatClient(address)
if err != nil {
t.Fatal(err)
}
id := fmt.Sprintf("c%02d", i)
client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
t.Logf("%v recv: %v\n", id, msg)
})
go func() {
client.SetName(id)
client.Send(&cs.NameMsg{id })
n := 0
for range time.Tick(time.Duration(1) * time.Second) {
client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })
n++
if n >= 3 {
break
}
}
client.Close()
}()
}
passedSeconds := 0
for range time.Tick(time.Second) {
passedSeconds++
t.Logf("%v seconds passed", passedSeconds)
if passedSeconds >= 5 {
break
}
}
server.Close()
logs := server.GetLogs()
fnHasLog := func(log string) bool {
for _,it := range logs {
if strings.Contains(it, log) {
return true
}
}
return false
}
for i := 0;i < clientCount;i++ {
msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
}
}
测试输出
$ go test -v ChatServer_test.go
=== RUN Test_ChatServer
tChatServer.handleIncomingConn, clientCount=1
tChatServer.handleIncomingConn, clientCount=2
tChatServer.handleIncomingConn, clientCount=3
ChatServer_test.go:59: 1 seconds passed
ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
ChatServer_test.go:59: 2 seconds passed
ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
tChatClient.postConnClosed, c00, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=false
tChatClient.postConnClosed, c01, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=true
tChatServer.handleClientClosed, c02
tChatServer.handleClientClosed, c02, clientCount=2
tChatClient.postConnClosed, c01, serverFlag=true
tChatServer.handleClientClosed, c01
tChatServer.handleClientClosed, c01, clientCount=1
ChatServer_test.go:59: 3 seconds passed
tChatClient.postConnClosed, c00, serverFlag=true
tChatServer.handleClientClosed, c00
tChatServer.handleClientClosed, c00, clientCount=0
ChatServer_test.go:59: 4 seconds passed
ChatServer_test.go:59: 5 seconds passed
--- PASS: Test_ChatServer (5.00s)
PASS
ok command-line-arguments 5.003s
IMsg.go
定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
package chat_server
import (
"encoding/base64"
"fmt"
)
type IMsg interface {
Encode() string
}
type NameMsg struct {
Name string
}
func (me *NameMsg) Encode() string {
return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
}
type ChatMsg struct {
Name string
Words string
}
func (me *ChatMsg) Encode() string {
return fmt.Sprintf("CHAT %s %s\n",
base64.StdEncoding.EncodeToString([]byte(me.Name)),
base64.StdEncoding.EncodeToString([]byte(me.Words)),
)
}
IMsgDecoder.go
定义消息解码器及其实现
package chat_server
import (
"encoding/base64"
"strings"
)
type IMsgDecoder interface {
Decode(line string) (bool, IMsg)
}
type tMsgDecoder struct {
}
func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {
items := strings.Split(line, " ")
size := len(items)
if items[0] == "NAME" && size == 2 {
name, err := base64.StdEncoding.DecodeString(items[1])
if err != nil {
return false, nil
}
return true, &NameMsg{
Name: string(name),
}
}
if items[0] == "CHAT" && size == 3 {
name, err := base64.StdEncoding.DecodeString(items[1])
if err != nil {
return false, nil
}
words, err := base64.StdEncoding.DecodeString(items[2])
if err != nil {
return false, nil
}
return true, &ChatMsg{
Name: string(name),
Words: string(words),
}
}
return false, nil
}
var MsgDecoder = &tMsgDecoder{}
IChatClient.go
定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
package chat_server
type IChatClient interface {
GetName() string
SetName(name string)
Send(msg IMsg)
RecvHandler(handler ClientRecvFunc)
CloseHandler(handler ClientCloseFunc)
Close()
}
type ClientRecvFunc func(client IChatClient, msg IMsg)
type ClientCloseFunc func(client IChatClient)
tChatClient.go
聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
package chat_server
import (
"bufio"
"fmt"
"io"
"net"
"sync/atomic"
"time"
)
type tChatClient struct {
conn net.Conn
name string
openFlag int32
closeFlag int32
serverFlag bool
closeChan chan bool
sendChan chan IMsg
sendLogs []IMsg
dropLogs []IMsg
recvLogs []IMsg
pendingSend int32
recvHandler ClientRecvFunc
closeHandler ClientCloseFunc
}
var gMaxPendingSend int32 = 100
func DialChatClient(address string) (error, IChatClient) {
conn, err := net.Dial("tcp", address)
if err != nil {
return err, nil
}
return nil, openChatClient(conn, false)
}
func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
it := &tChatClient{
conn: conn,
openFlag: 0,
closeFlag: 0,
serverFlag: serverFlag,
closeChan: make(chan bool),
sendChan: make(chan IMsg, gMaxPendingSend),
name: "anonymous",
sendLogs: []IMsg{},
dropLogs: []IMsg{},
recvLogs: []IMsg{},
}
it.open()
return it
}
func (me *tChatClient) GetName() string {
return me.name
}
func (me *tChatClient) SetName(name string) {
me.name = name
}
func (me *tChatClient) open(){
if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
return
}
go me.beginWrite()
go me.beginRead()
}
func (me *tChatClient) isClosed() bool {
return me.closeFlag != 0
}
func (me *tChatClient) isNotClosed() bool {
return !me.isClosed()
}
func (me *tChatClient) Send(msg IMsg) {
if me.isClosed() {
return
}
if me.pendingSend < gMaxPendingSend {
atomic.AddInt32(&me.pendingSend, 1)
me.sendChan <- msg
} else {
me.dropLogs = append(me.dropLogs, msg)
}
}
func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {
if me.isNotClosed() {
me.recvHandler = handler
}
}
func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {
if me.isNotClosed() {
me.closeHandler = handler
}
}
func (me *tChatClient) Close() {
if me.isNotClosed() {
me.closeConn()
}
}
func (me *tChatClient) beginWrite() {
writer := io.Writer(me.conn)
for {
select {
case <- me.closeChan:
_ = me.conn.Close()
me.closeFlag = 2
me.postConnClosed()
return
case msg := <- me.sendChan:
atomic.AddInt32(&me.pendingSend, -1)
_,e := writer.Write([]byte(msg.Encode()))
if e != nil {
me.closeConn()
break
} else {
me.sendLogs = append(me.sendLogs, msg)
}
case <- time.After(time.Duration(10) * time.Second):
me.postRecvTimeout()
break
}
}
}
func (me *tChatClient) postRecvTimeout() {
fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
me.closeConn()
}
func (me *tChatClient) beginRead() {
reader := bufio.NewReader(me.conn)
for {
line, err := reader.ReadString('\n')
if err != nil {
me.closeConn()
break
}
ok, msg := MsgDecoder.Decode(line)
if ok {
fn := me.recvHandler
if fn != nil {
fn(me, msg)
}
me.recvLogs = append(me.recvLogs, msg)
}
}
}
func (me *tChatClient) closeConn() {
if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
return
}
me.closeChan <- true
}
func (me *tChatClient) postConnClosed() {
fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)
handler := me.closeHandler
if handler != nil {
handler(me)
}
me.closeHandler = nil
me.recvHandler = nil
}
IChatServer.go
定义聊天服务器接口, 为方便测试, 提供日志采集方法
package chat_server
type IChatServer interface {
Open(port int) error
Broadcast(msg IMsg)
Close()
GetLogs() []string
}
tChatServer.go
实现聊天服务器IChatServer
package chat_server
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
)
type tChatServer struct {
openFlag int32
closeFlag int32
clients []IChatClient
clientCount int
clientLock *sync.RWMutex
listener net.Listener
recvLogs []IMsg
logs []string
}
func NewChatServer() IChatServer {
it := &tChatServer{
openFlag: 0,
closeFlag: 0,
clients: []IChatClient{},
clientCount: 0,
clientLock: new(sync.RWMutex),
listener: nil,
recvLogs: []IMsg{},
}
return it
}
func (me *tChatServer) Open(port int) error {
if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
return errors.New("server already opened")
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {
return err
}
me.listener = listener
go me.beginListening()
return nil
}
func (me *tChatServer) logf(f string, args... interface{}) {
msg := fmt.Sprintf(f, args...)
me.logs = append(me.logs, msg)
fmt.Println(msg)
}
func (me *tChatServer) GetLogs() []string {
return me.logs
}
func (me *tChatServer) isClosed() bool {
return me.closeFlag != 0
}
func (me *tChatServer) isNotClosed() bool {
return !me.isClosed()
}
func (me *tChatServer) beginListening() {
for !me.isClosed() {
conn, err := me.listener.Accept()
if err != nil {
me.Close()
break
}
me.handleIncomingConn(conn)
}
}
func (me *tChatServer) Close() {
if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
return
}
_ = me.listener.Close()
me.closeAllClients()
}
func (me *tChatServer) closeAllClients() {
me.clientLock.Lock()
defer me.clientLock.Unlock()
for i,it := range me.clients {
if it != nil {
it.Close()
me.clients[i] = nil
}
}
me.clientCount = 0
}
func (me *tChatServer) handleIncomingConn(conn net.Conn) {
// init client
client := openChatClient(conn, true)
client.RecvHandler(me.handleClientMsg)
client.CloseHandler(me.handleClientClosed)
// lock me.clients
me.clientLock.Lock()
defer me.clientLock.Unlock()
// append to me.clients
if len(me.clients) > me.clientCount {
me.clients[me.clientCount] = client
} else {
me.clients = append(me.clients, client)
}
me.clientCount++
me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
}
func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {
me.recvLogs = append(me.recvLogs, msg)
if nameMsg,ok := msg.(*NameMsg);ok {
client.SetName(nameMsg.Name)
} else if _, ok := msg.(*ChatMsg);ok {
me.Broadcast(msg)
}
}
func (me *tChatServer) handleClientClosed(client IChatClient) {
me.logf("tChatServer.handleClientClosed, %s", client.GetName())
me.clientLock.Lock()
defer me.clientLock.Unlock()
if me.clientCount <= 0 {
return
}
lastI := me.clientCount - 1
for i,it := range me.clients {
if it == client {
if i == lastI {
me.clients[i] = nil
} else {
me.clients[i], me.clients[lastI] = me.clients[lastI], nil
}
me.clientCount--
break
}
}
me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
}
func (me *tChatServer) Broadcast(msg IMsg) {
me.clientLock.RLock()
defer me.clientLock.RUnlock()
for _,it := range me.clients {
if it != nil {
it.Send(msg)
}
}
}
(未完待续)
有疑问加站长微信联系(非本文作者)