本项目地址:gof 一个支持百万连接的websocket框架
本文提及的内容包含在:epoll.go
关于Linux Epoll模型的原理,这里就不在做过多介绍了,在Nginx、Redis等的网络并发模型方面,有诸多的详细解释。
Epoll模型中,应用是监听Epoll的Wait接口,来等待系统的推送,因此,比之select/poll等的实现方式略微复杂。
首先我们需要定义一个epoll对象的结构体,在其中记录一下我们需要的内容,包括监听端口的句柄、epoll对象以及监听的ip,端口等一些其他内容:
type EpollObj struct {
socket int //socket连接
epId int //epoll 创建的唯一描述符
ip string //socket监听的地址
port int //socket监听的端口
eventPool *sync.Pool //接收epoll消息的缓存池,在初始化epoll对象的时候创建,不用每次接收消息都频繁的去创建。
}
然后我们需要需要创建一个全局的句柄,并且指定一个端口,绑定到这个句柄上,这样我们的应用就可以进行端口来进行消息的收发了。
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
if err != nil {
Log.Error("getScoket err:%v", err.Error())
os.Exit(1)
}
syscall是golang的标准库,用来调用操作系统的接口。
syscall.Socket的作用是创建一个socket连接,接收三个参数,分别为:
第一个参数 domain
syscall.AF_INET,表示服务器之间的网络通信
syscall.AF_UNIX 表示同一台机器上的进程通信
syscall.AF_INET6 表示以IPv6的方式进行服务器之间的网络通信
第二个参数 typ
syscall.SOCK_RAW,表示使用原始套接字,可以构建传输层的协议头部,启用IP_HDRINCL的话,IP层的协议头部也可以构造,就是上面区分的传输层socket和网络层socket。
syscall.SOCK_STREAM, 基于TCP的socket通信,应用层socket。
syscall.SOCK_DGRAM, 基于UDP的socket通信,应用层socket。
第三个参数 proto
IPPROTO_TCP 接收TCP协议的数据
IPPROTO_IP 接收任何的IP数据包
IPPROTO_UDP 接收UDP协议的数据
IPPROTO_ICMP 接收ICMP协议的数据
IPPROTO_RAW 只能用来发送IP数据包,不能接收数据。
该接口会返回两个参数,句柄fd和错误信息,如果错误信息为空,那么我就可以通过这个句柄去绑定IP。
//监听
addr := syscall.SockaddrInet4{Port: e.port}
ip := "0.0.0.0"
if e.ip != "" {
ip = e.ip
}
copy(addr.Addr[:], net.ParseIP(ip).To4())
if err := syscall.Bind(e.socket, &addr); err != nil {
Log.Error("bind err:%v", err.Error())
os.Exit(1)
}
if err := syscall.Listen(e.socket, 10); err != nil {
Log.Error("listen err:%v", err.Error())
os.Exit(1)
}
在绑定完成之后,我们就可以通过这个fd来进行各种消息的处理了。接下来我们需要进行Epoll对象的创建:
//创建epfd
epfd, err := syscall.EpollCreate1(0)
Log.Info("getGlobalFd 创建的epfd为:%+v,e.fd:%d", epfd, e.socket)
if err != nil {
Log.Error("epoll_create1 err:%+v", err)
os.Exit(1)
}
通过EpollCreate函数可以创建一个Epoll对象。在golang中对于epoll对象的创建有两个函数:EpollCreate 和EpollCreate1,在使用中推荐第二个,因为EpollCreate函数需要传入一个size,也就是手动分配可承载句柄的大小,而现在这些内容已经随着linux的更新,变得不再有限制。
epoll对象创建完成之后,我们需要为其分配三个方法,来操作连接句柄,分别为eAdd,eDel,eWait
//EpollADD方法,添加、删除监听的fd
//fd 需要监听的fd对象
//status syscall.EPOLL_CTL_ADD添加
func (e *EpollObj) eAdd(fd int) {
//通过EpollCtl将epfd加入到Epoll中,去监听
if err := syscall.EpollCtl(e.epId, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Events: EPOLLLISTENER, Fd: int32(fd)}); err != nil {
Log.Error("epoll_ctl add err:%+v,fd:%+v", err, fd)
os.Exit(1)
}
}
// syscall.EPOLL_CTL_DEL删除
func (e *EpollObj) eDel(fd int) {
//通过EpollCtl将epfd加入到Epoll中,去监听
if err := syscall.EpollCtl(e.epId, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{Events: EPOLLLISTENER, Fd: int32(fd)}); err != nil {
Log.Error("epoll_ctl del err:%+v,fd:%+v", err, fd)
os.Exit(1)
}
}
func (e *EpollObj) eWait(handle func(fd int, connType ConnStatus)) error {
events := e.eventPool.Get().([]syscall.EpollEvent)
defer func() {
events := make([]syscall.EpollEvent, 1024)
e.eventPool.Put(events)
}()
n, err := syscall.EpollWait(e.epId, events[:], -1)
if err != nil {
Log.Error("epoll_wait err:%+v", err)
return err
}
if n > 0 {
fmt.Printf("events fds :%+v\n", events[:5])
}
for i := 0; i < n; i++ {
//如果是系统描述符,就建立一个新的连接
connType := CONN_MESSAGE //默认是读内容
if int(events[i].Fd) == e.socket {
connType = CONN_NEW
}
handle(int(events[i].Fd), connType)
}
return nil
}
通过这三个函数,我们可以实现对于句柄的添加、删除还有接收消息的操作。
有疑问加站长微信联系(非本文作者)