前言
tcp 是服务端和客户端的常见交互协议,是全双工的,即客户端和服务端可以随时对发消息。那么golang要如何使用tcp服务呢?
使用
实例地址 : golang使用tcp官方库示例
这个示例,展示了客户端和服务端对发序列,在使用官方提供的tcp库时,需要考虑以下问题:
拆包
当客户端在短时间内,或者是并发下,发送几条序列时[0 1 2 3] [2 8 7] [9 88],在服务端有可能接收到的是[0 1 2 3 2 8 7 9 88],但很显然,这是3条消息,需要在服务端进行拆包,分离成三条有效的消息才可以。心跳
当客户端和服务端连接后,客户端因为某些原因崩溃了,没有走正常的离线操作,那么这时,服务端无法感知到客户端的结束,会继续把持着连接,耗费资源,所以需要设定心跳。由客户端每隔一段时间发送一条消息,让服务端知道他还活着。消息分离
所有的客户端消息,都从同一个conn中留出,而不同的消息有不同的处理方法,请求参数,需要对拆分出来的消息进行分离,适用予不同的handler。维护成本
维护成本体现在两点。第一,从官方的库示例可以看出,底层的写法很多问题没有考虑到,仅仅是涉及到连接的建立和对发,在开发时,不同的使用者写法群魔乱舞,很难做到统一,人数越多越难维护。第二,仅使用基本包在需求迭代时,很容易让代码变得越来越臃肿,比如临时给你一个需求,要求像http那样,支持某一个handler的中间件。
TCPX
TCPX 是纯golang开发的tcp框架,具备上述所有问题的解决方案,并且速度很快。
仓库地址: https://github.com/fwhezfwhez/tcpx
TCPX仿制http框架gin,几乎具备了大多数gin的原生api,使用起来极其简便。可以感受一下!
发起tcp服务
package main
import (
"github.com/fwhezfwhez/tcpx"
)
func main() {
srv := tcpx.NewTcpX(nil)
srv.ListenAndServe("tcp", ":8102")
}
直接针对协议和端口号就可以启动了,太简单了。
中间件
中间件模式是tcpx最大的特色,它自带传输序列,内置messageID,可以根据messageID如同http请求一般,使用中间件,比如统计服务被请求的次数:
package main
import (
"fmt"
"github.com/fwhezfwhez/tcpx"
"sync/atomic"
)
var requestTimes int32
func main() {
srv := tcpx.NewTcpX(nil)
srv.UseGlobal(countRequestTime)
srv.AddHandler(3, getRequestTime)
fmt.Println("tcp listen on :8080")
if e := srv.ListenAndServe("tcp", ":8080"); e != nil {
panic(e)
}
}
func countRequestTime(c *tcpx.Context) {
atomic.AddInt32(&requestTimes, 1)
}
func getRequestTime(c *tcpx.Context) {
c.JSON(4, tcpx.H{"message": "success", "request_times": requestTimes})
}
上述代码制定了一个messageID
为3的路由,所有客户端发送的messageID
为3的消息,都会经过该路由处理,并且,它确保每个消息都会经过全局中间件countRequestTime
关于中间件,tcpx支持了以下三种
- 锚类型
srv.Use("before-login", beforeLogin)
srv.AddHandler(1, login)
调用Use()后,添加的路由会经过该中间件处理。
- 全局类型
srv.UseGlobal(countRequestTime)
srv.AddHandler(3, getRequestTime)
所有路由,都会经过全局中间件处理。
- 路由类型
srv.AddHandler(3, beforeLogin, login)
仅仅该路由会经过路由中间件处理。
优雅关闭,重启
package main
import (
"fmt"
"runtime/debug"
"time"
"github.com/fwhezfwhez/tcpx"
"log"
//"tcpx"
)
func main() {
srv := tcpx.NewTcpX(nil)
// start server
go func() {
fmt.Println("tcp listen on :8080")
srv.ListenAndServe("tcp", ":8080")
}()
// after 10 seconds and stop it
go func() {
time.Sleep(10 * time.Second)
if e := srv.Stop(false); e != nil {
log.Println(fmt.Sprintf("%s \n %s", e.Error(), debug.Stack()))
return
}
// operate between stop and start
// do something
fmt.Println("before start, print ok")
// after 10 seconds start again
time.Sleep(10 * time.Second)
if e := srv.Start(); e != nil {
log.Println(fmt.Sprintf("%s \n %s", e.Error(), debug.Stack()))
return
}
// or call `Restart()` equals to above `Close` and `Start`
//if e := srv.Restart(false, func() {
// fmt.Println("before start, print ok")
//}); e != nil {
// fmt.Println(e.Error())
//}
}()
select {}
}
自带心跳
package main
import (
"github.com/fwhezfwhez/tcpx"
//"tcpx"
"time"
)
func main() {
srv := tcpx.NewTcpX(nil)
srv.HeartBeatModeDetail(true, 10*time.Second, false, tcpx.DEFAULT_HEARTBEAT_MESSAGEID)
//srv.RewriteHeartBeatHandler(1300, func(c *tcpx.Context) {
// fmt.Println("rewrite heartbeat handler")
// c.RecvHeartBeat()
//})
tcpx.SetLogMode(tcpx.DEBUG)
srv.ListenAndServe("tcp", ":8101")
}
自带用户池,与上线下线,池内用户对发
package main
import (
"fmt"
"github.com/fwhezfwhez/errorx"
"github.com/fwhezfwhez/tcpx"
//"tcpx"
)
func main() {
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.WithBuiltInPool(true)
srv.AddHandler(1, online)
srv.AddHandler(3, offline)
srv.ListenAndServe("tcp", ":8102")
}
func online(c *tcpx.Context) {
type Login struct {
Username string `json:"username"`
}
var login Login
if _, e := c.Bind(&login); e != nil {
fmt.Println(errorx.Wrap(e).Error())
return
}
c.Online(login.Username)
fmt.Println("online success")
}
func offline(c *tcpx.Context) {
fmt.Println("offline success")
c.Offline()
}
有疑问加站长微信联系(非本文作者)