Golang 轻量级-高并发socket框架——chitchat

Ovenvan · · 807 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这是基于golang socket 一个轻量级,支持高并发操作的开发框架chitchat。本文将介绍chitchat的基本使用方法;通过源码分析该框架的具体工作流程;简要讲解作者留下的Demo文件和该框架的使用技巧;下载链接。通过该框架,我们可以方便建立起Server-Client长连接并通信。

使用chitchat

chitchat得以支持高并发连接的关键在于其能够快速响应客户端发起的链接并及时开启goroutine确保一对一的通信。对于使用者而言,只需负责向框架注册正确的IP socket(ipAddr:ipPort)(注:除非特别说明,否则后续提到的地址Addr均指addr:port)并正确编写用于处理接收数据异常处理函数即可正常运行。

开启一个Server

仅需创建一个Server实例并调用其Listen()方法即可使一个Server开始正常工作。一个Server通常只用于监听一个端口,负责一类事物的调度处理。我们看一下具体调度的API:

func NewServer(
    ipaddrsocket string,
    delim byte, 
    readfunc ServerReadFunc, 
    additional interface{}) Server

可以看到,创建一个Server实例需要提供四个参数,分别为监听对象分隔符处理函数附加数据。其中附加数据可置为空(nil)
监听对象即可供Client连接的IPsocket;当Server读到一连串数据后,将通过delim分隔符将数据切片并交予readfunc处理,多片数据将调用多次readfunc。delim可置为0,此时Server将持续读到EOF后才会交付数据。当delim置为‘\n’时,Server会默认换行交付,此时会根据Windows‘\r\n’作出对应调整;处理函数将处理Server交付的数据流;附加数据是为了配合readfunc更好的完成对数据的处理。后续在讲解如何编写readfunc时会提及如何使用additional给出的数据。
Server实则是一个可供调用的对外API接口interface,其中包含Listen()方法启动该Server开始监听。

func (t *server) Listen() error

Listen是一个异步方法,如果发现配置参数有误或端口被占用等错误将会直接返回,否则就在后台拉起新的goroutine处理具体事务。Listen()方法不阻塞进程,也不会等到后台goroutine全部正常工作后再返回。
后台goroutine在运转处理的过程中若遇到错误将通过Err channel告知使用者,因此使用者需要显式地接收并处理error。注意即使不需要这些error信息,我们也需要有一个接收的过程,否则会导致后台进程堵塞。通过ErrChan()获取该Channel:

type Errsocket struct {
    Err        error
    RemoteAddr string
}

func (t *server) ErrChan() <-chan Errsocket

发送的错误消息包含两部分,error对端ip(addr:port)
当我们想关闭该Server,只需调用其Cut函数:

func (t *server) Cut() error

Cut()方法会使Server停止监听Socket,同时释放所有已连接的Connection。该方法和Listen()一样,也不会等待所有Connection全部关闭后再返回。倘若希望关闭某特定的Connection(当然在我们已经知道该Connection对端连接IPaddr的前提下),我们可以使用CloseRemote方法:

func (t *server) CloseRemote(remoteAddr string) error

至此较为重要的Server API已经简单介绍完成了,另外有些较为简单的API根据名字便可知道其作用,不再简单赘述。之后我们会通过一个简单的例子演示这些API的用法。

type Server interface {
    Listen() error
    Cut() error
    CloseRemote(string) error
    RangeRemoteAddr() []string
    GetLocalAddr() string
    SetDeadLine(time.Duration, time.Duration)
    ErrChan() <-chan Errsocket
    Write(interface{}) error
}

开启一个Client

通过NewClient函数创建一个Client实例,并通过调用其API方法向服务端发起连接。

func NewClient(
    ipremotesocket string,
    delim byte, 
    readfunc ClientReadFunc, 
    additional interface{}) Client {

可以发现,创建Client实例的函数参数与创建Server实例NewServer的函参形式和意义基本相同。再次便不再多加解释。注意的是,v1.0.0版本Client还未能指定自己的ipaddr,只能连接成功后随机分配;另外,readfunc对于Server而言是不可置为空(nil)的,但对于Client而言可以置为nil,即忽略所有Server发送的消息。再有,对于一对Server/Client而言,其分隔符delim应该约定好是相同的,否则可能会出现消息切分错误的情况。
Client通过调用Dial()方法向Server发起连接。

func (t *client) Dial() error 

若连接错误,则会返回具体错误原因,否则拉起相应goroutine执行后续操作并返回。
关闭连接可使用API提供的Close()命令。

func (t *client) Close()

该函数的作用仅仅是向Client发送了退出的信号,若此时还有业务处于运行状态(如readfunc)则会等待业务正常关闭后再退出。
以下是Client的全部对外API:

type Client interface {
    Dial() error
    Close()
    SetDeadLine(time.Duration)
    ErrChan() <-chan Errsocket
    Write(interface{}) error
    GetRemoteAddr() string
    GetLocalAddr() string
}

最后我们讲解Write()方法。从函数签名中可以看到,无论什么类型都可以传递给Write方法进行发送。但事实上,目前库自带的Write()方法只能够很好的处理以下几种类型:string;[]byte;不包含任何pointer-value(包括string)的struct(也就是说,如果struct中某数据类型是string,那么接收方读到的数据将会出现偏差)。正如框架作者所言,当前的Write()方法不是一个最好的方法。因此,框架提供了一个函数使得使用者可以自定义Write()方法:

type wf func(net.Conn, interface{}, byte) error
func SetWriteFunc(f wf)

readfunc与APIs:

该框架最为重要的核心部分即readfunc的编写,它的作用是处理由Server/Client递交的数据片。我们先看一下readfunc的函数签名:

type ClientReadFunc func([]byte, ReadFuncer) error
type ServerReadFunc func([]byte, ReadFuncer) error

无论是Client的readfunc或Server的readfunc,其函数签名都是相同的。ReadFuncer是一个接口interface,它提供了一系列在readfunc函数中可用的API。稍后我们会对其中部分方法进行讲解。

type ReadFuncer interface {
    GetRemoteAddr() string
    GetLocalAddr() string
    GetConn() net.Conn
    Close()
    Write(interface{}) error
    Addon() interface{}
}

由于socket只允许传递[]byte类型的数据,因此我们要做的第一步就是将[]byte类型转变为我们希望的数据类型。如果写入的是一个数据类型,我们想从[]byte转为struct可使用:

var t = *(**yourStruct)(unsafe.Pointer(&str))

这里将yourStruct替换为你自己的结构体别名即可。若是string类型,则简单使用类型转换即可。
readfunc提供了够用的API,包括获得本地/远程IP socket与Conn,发送数据,关闭连接,获得附加数据。还记得附加数据吗,这是我们在最初创建Server/Client实例时传入的一个参数,现在可以通过Addon()将其取出来使用了。一般建议传入的是一个指针类型的Addon,这样readfunc可对其进行修改。
关于Close()函数:不用担心在readfunc中使用Close()方法会提前终止readfunc业务,导致数据无法正常交付。正如前文所言,Close()只是向框架传递一个关闭的信号。框架会等待readfunc全部执行完毕后再关闭这个连接。

源码分析

在分析Demo之前,我们先简单探究一下约600多行的源码,看一下其内部各goroutine的支配运行情况。


Chitchat - goroutine

Server调用Listen()方法时,Server内部会拉起一个hL goroutine(handleListen);当成功响应Client的Dial方法时,hL拉起新的goroutinehC4s(handleConnforServer);hC4s通过拉起read读取DATA并负责将DATA交付给Readfunc。一个hC4s对应一个连接,多个连接将开启多个hC4sreadClient向服务端发起连接成功后也将拉起一个goroutinehC4c(handleConnforClient)和readeD是一个较为特殊的goroutine,他负责用户监听的errChannel是否处于关闭状态并将goroutine产生的错误数据传递给用户。本文将不详细分析eD,详情可以参考这篇文章 多goroutine异步通知error的一种方法
Server调用Cut()方法关闭监听后,它将关闭hL与所有的hC4s和read,以及负责错误转发的eD,同时关闭errChannel;调用Close()/CloseRemote(...)方法时,仅关闭当前连接对应的hC4s和read,不关闭errChannel;Client调用Close()关闭连接后,将关闭开启的所有goroutine和errChannel。

hC4s和hC4c大同小异,我们着重分析一下hC4s源码:

func handleConnServer(h *hConnerServer, eC chan Errsocket, ctx context.Context, s *server) {...}
type hConnerServer struct {
    conn     net.Conn
    d        byte
    mu       *sync.Mutex
    readfunc ServerReadFunc
}

hConnerServer结构体主要包含了以下内容:连接实例conn,分隔符d,普通锁mureadfunc,其中mutex主要用于维护eD的正常工作;eC是上游传递下来的错误发送通道;监听ctx.Done()保证与上游一起收到退出信号,但不保证退出的顺序;server提供readfunc使用的API。defer()语句保证了当hC4s退出时,将安全关闭conn和eD goroutine。
拉起的readgoroutine将读到的DATA分片通过channel发送给hC4s,hC4s将该DATA交给readfunc处理:

//hC4s
        case strReq, ok := <-strReqChan: //read a data slice successfully
            if !ok {
                return //EOF && d!=0
            }
            err := h.readfunc(strReq, &server{
                currentConn: h.conn,
                delimiter:   h.d,
                remoteMap:   s.remoteMap,
                additional:  s.additional,
            })
            if err != nil {
                h.mu.Lock()
                eC <- Errsocket{err, h.conn.RemoteAddr().String()}
            }
        }

一个server struct同时实现了Server interface和ReadFuncer interface中的所有方法,并通过接口的方式将特定的方法暴露给框架的使用者,这样设计使一些重复的方法在代码上得到复用。
hC4c在这段代码上稍有不同:

//hC4c
        case strReq, ok := <-strReqChan: //read a data slice successfully
            if !ok {
                return //EOF && d!=0
            }
            if h.readfunc != nil {
                h.rcmu.Lock()
                err := h.readfunc(strReq, client)
                if err != nil {
                    h.eD.mu.Lock()
                    eC <- Errsocket{err, h.conn.RemoteAddr().String()}
                }
                h.rcmu.Unlock()
            }
        }

区别在于:

  1. 对于Client而言,其readfunc是可为nil的,这样正常读数据但不会被处理;
  2. 与Server相比,多了一个rcmu的锁。该锁是防止在readfunc中调用了Close()方法后error Channel被提前关闭,导致readfunc的错误信息无法被正确送达。我们可以看一下Client的Close()方法:
func (t *client) Close() {
    go func() {
        t.rcmu.Lock()
        t.mu.Lock()
        t.closed = true
        close(t.eU)
        t.mu.Unlock()
        t.rcmu.Unlock()
        t.cancelfunc()
    }()
}

可以看到,Close()方法会等待rcmu锁被释放后再执行后续操作。而为什么Server不需要为之加锁呢。因为Server在readfunc调用的Close()方法不会关闭上游的error Channel。

Server通过并发安全的map存储每个Conn对应的ip socket与cancelFunc,保证能够独立关闭任意Conn。

Demo分析

与上文一样,首先将各goroutine运作与调度的流程关系通过图的形式表现出来,并简要解释各goroutine的作用:


demo - goroutine.jpg

Master提供的Listen()方法将注册一个名为registerNodereadfunc;当Node节点向Master注册成功后,Node节点拉起一个dHBL(daemon-HeartBeatListener) goroutine,在7939端口发起监听并注册hb4node readfunc,用于接收ping报文并发送pong回应;Master会拉起一个dHBC(daemon-HeartBeatChecker),定时向Node端发起连接并发送ping报文,并注册hb4masterreadfunc,当成功接收到pong报文后主动关闭连接。若在接收报文消息过程中出现错误,将发送错误消息至HBC/L error 错误处理器,供作进一步处理。
dHBC连续接收到三次以上错误消息后,判定对端Node失去连接;当HBL error十秒以上未收到Master发来的消息后,判定Master已丢失自己。

Demo Tricks

在hb4master/node readfunc中,无论结果成功与否,都会发送一个error("succeed"或 具体错误),这样在HBC/L error便可根据error得知此次消息传递的结果,并作进一步操作。

Github

Github:chitchat
或者也可以通过

go get github.com/ovenvan/chitchat

下载并使用。希望大家多pr并提issue,帮助这个框架更加完善。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Ovenvan

查看原文:Golang 轻量级-高并发socket框架——chitchat

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

807 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传