redis 是cs架构,网络采用epoll 模型,单线程处理每个请求。
很多同学对单线程有些疑问,简单的解释一下 redis 单线程的意思,redis 服务端虽说是单线程,但是可以同时 持有很多connection,每个connection 都可以同时发请求,只不过在 redis 服务端,一个一个的处理每个connection 发过来的request, 通俗点说就是,很多请求都能发过来,redis 会存下来(其实是存在每个connection socket 内核缓冲区),一个一个处理。
为什么单线程处理效率如此之高?
- 几乎所有的操作全部是内存操作,内存操作非常快(如果有一些系统调用,磁盘操作,单线程不会快的)
- 单线程避免了使用锁(memcache 使用了多线程,因为多了锁之类的,也没比redis快多少)
EPOLL 介绍
如果想读懂 redis 网络相关的代码,必须先搞清楚 epoll 的使用,epoll 说白了就是监听 fd(file descriptor,操作 fd 其实就是操作socket),每当 fd 上面有消息的时候(比如 可读,可写 消息等),就会得到通知,这样就可以处理了。epoll 主要好处是可以同时监听多个 fd(可以持有多个 client 连接)
epoll 使用只需要三步:
- int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽 - int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events /
epoll_data_t data; / User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
- int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
Redis 中 epoll 的使用
Redis epoll 封装介绍
redis 跟 网络相关的代码写的比较简洁,主要就两处
不同操作系统的 epoll 代码,都在 ae_epoll.c ae_evport.c ae_kqueue.c ae_select.c 中, linux 使用 ae_epoll.c , mac 使用 ae_kqueue.c
对 epoll 代码的封装在 ae.c 中
aeCreateEventLoop 是对 epoll_create 的封装
aeCreateFileEvent 是对 epoll_ctl 的封装,同时会将rfileProc, wfileProc 两个处理消息的回调函数一起封装
aeProcessEvents 是对 epoll_wait 的封装
aeMain 是一个死循环,不停的调用 aeProcessEvents, redis 就是在这里不停的收到 client 的 request, 并且一个一个处理
aeCreateEventLoop:
创建 aeEventLoop 结构体
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
这个结构体中主要就是 events, fired 两个aeFileEvent类型变量,aeFileEvent 中 rfileProc, wfileProc 是两个回调函数, 分别处理读时间, 写时间, events 是aeCreateFileEvent 函数调用时 为其赋值,fird 是 监听到有消息来的时候 为其赋值,在 ae_epoll.c 中 aeApiPoll 函数。
总结一下, 服务启动 aeCreateEventLoop 创建 aeEventLoop 类型的变量, 将需要监控的 fd, 通过 aeCreateFileEvent 监听(同时将 赋值 rfileProc, wfileProc 回调函数), aeProcessEvents 监听到有消息需要处理的时候, 会使用 rfileProc, wfileProc 回调函数处理消息。所以,读 Redis 网络相关代码 ,其实只是看 aeCreateFileEvent(监听fd,设置对fd的回调函数) 在哪些地方被调用就可以了。
Redis 关键代码
- initServer(server.c )无关代码删除:
void initServer(void) {
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
listenToPort(server.port,server.ipfd,&server.ipfd_count);
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
这段代码在默认开启redis-server 的情况下,server.ipfd 代表的fd 是 6379 打开的socket, 在6379监听到的消息,都调用 acceptTcpHandler 函数
- acceptTcpHandler(networking.c)无关代码删除
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(cfd,0,cip);
}
}
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
c = createClient(fd)
}
client *createClient(int fd) {
aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
}
这段代码很清晰的表明了, 对于6379 过来的请求,全部 使用acceptTcpHandler 函数生成一个新的fd, 在同时将这个fd 放在 eventloop 中监听,并且 使用 readQueryFromClient 来处理
- readQueryFromClient(networking.c) 无关代码删除
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
nread = read(fd, c->querybuf+qblen, readlen);
processInputBuffer(c);
}
readQueryFromClient 就是把请求内容读出来, 在调用 processInputBuffer 处理, processInputBuffer 就是 redis 里面各种业务逻辑了
redis 网络相关代码其实就是一句话 使用 epoll(linux上) 处理每一个请求
Redis 如何处理TCP 粘包,拆包
set a 1 这条指令,按照 redis 协议,会翻译成
*3
$3
set
$1
a
$1
1
*3 表示有3行数据, $3 表示 有3个字符
在我没读redis代码的时候,我一直认为 从缓冲区 读出来自己需要的长度,处理好以后,在从缓冲区里继续读,看了redis 代码以后,我才发现自己 too yong, redis 是这样做的(networking.c readQueryFromClient 函数, 代码有删减)
readlen = PROTO_IOBUF_LEN;
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
redis 每次都读缓冲区的大小,如果最后一条消息不完整,下次计算一下长度,继续读,因为这个骚操作(知道了其实就是常规操作),让效率大大提升了,不然每次使用 read 系统调用,非常影响性能,特别对于 redis 这种单线程模型程序影响就更大了。
Redis如何处理 half connection
client -> server, 虽然我们都说connection,其实 就是client 开着一个 fd, server 开着一个 fd,两个fd之间可以互相通信,关闭的时候 一个 fd 跟另外一个 fd 说我准备关闭了(tcp 四次挥手), 不过如果有的极端情况(在大规模server端是常规情况),比如拔网线,关机,网络异常等原因, 可能发不出任何消息 就断了,另外一个 fd 就在那里傻傻的等着,这就出现了 half connection 的情况。
如何处理 half connection
一般的处理办法就是心跳检查,服务端会 定时的 ping 客户端,如果连续几次 都 ping 不通,那么就会主动断开链接
为什么不使用 keep_alive 处理
Host Requirements RFC罗列有不使用它的三个理由:
在短暂的故障期间,它们可能引起一个良好连接(good connection)被释放(dropped)
它们消费了不必要的宽带
在以数据包计费的互联网上它们(额外)花费金钱。然而,在许多的实现中提供了存活定时器。
这种说法有它的道理,但是并不能说服我不使用 keep alive,最能说服我的是在知乎上看过的一句话, keep_alive 只能保证 tcp 是正常的,但是不能保证 用户程序是正常的,
Redis 如何处理
server.c clientsCronHandleTimeout 函数
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
freeClient(c);
return 1;
} else if (c->flags & CLIENT_BLOCKED) {
......
通过代码可以看出来,redis 根本就既没有用 keep_alive , 也没有用 ping, 而是简单粗暴的通过 client 最后一次访问server 的时间 条件来判断,不管 这条连接是不是正常的,这样同样可以解决 half connection 问题。
当我们在读写Socket时,我们究竟在读写什么?
套接字socket是大多数程序员都非常熟悉的概念,它是计算机网络编程的基础,TCP/UDP收发消息都靠它。我们熟悉的web服务器底层依赖它,我们用到的MySQL关系数据库、Redis内存数据库底层依赖它。我们用微信和别人聊天也依赖它,我们玩网络游戏时依赖它,读者们能够阅读这篇文章也是因为有它在背后默默地支持着网络通信。
简单过程
当客户端和服务器使用TCP协议进行通信时,客户端封装一个请求对象req,将请求对象req序列化成字节数组,然后通过套接字socket将字节数组发送到服务器,服务器通过套接字socket读取到字节数组,再反序列化成请求对象req,进行处理,处理完毕后,生成一个响应对应res,将响应对象res序列化成字节数组,然后通过套接字将自己数组发送给客户端,客户端通过套接字socket读取到自己数组,再反序列化成响应对象。
通信框架往往可以将序列化的过程隐藏起来,我们所看到的现象就是上图所示,请求对象req和响应对象res在客户端和服务器之间跑来跑去。
也许你觉得这个过程还是挺简单的,很好理解,但是实际上背后发生的一系列事件超出了你们中大多数人的想象。通信的真实过程要比上面的这张图复杂太多
为了方便大家对通信底层的理解,我花了些时间做了下面这个动画,它并不能完全覆盖底层细节的全貌,但是对于理解套接字的工作机制已经足够了。请读者仔细观察这个动画,后面的讲解将围绕着这个动画展开。
我们平时用到的套接字其实只是一个引用(一个对象ID),这个套接字对象实际上是放在操作系统内核中。这个套接字对象内部有两个重要的缓冲结构,一个是读缓冲(read buffer),一个是写缓冲(write buffer),它们都是有限大小的数组结构。
当我们对客户端的socket写入字节数组时(序列化后的请求消息对象req),是将字节数组拷贝到内核区套接字对象的write buffer中,内核网络模块会有单独的线程负责不停地将write buffer的数据拷贝到网卡硬件,网卡硬件再将数据送到网线,经过一些列路由器交换机,最终送达服务器的网卡硬件中。
同样,服务器内核的网络模块也会有单独的线程不停地将收到的数据拷贝到套接字的read buffer中等待用户层来读取。最终服务器的用户进程通过socket引用的read方法将read buffer中的数据拷贝到用户程序内存中进行反序列化成请求对象进行处理。然后服务器将处理后的响应对象走一个相反的流程发送给客户端,这里就不再具体描述。
阻塞
我们注意到write buffer空间都是有限的,所以如果应用程序往套接字里写的太快,这个空间是会满的。一旦满了,写操作就会阻塞,直到这个空间有足够的位置腾出来。不过有了NIO(非阻塞IO),写操作也可以不阻塞,能写多少是多少,通过返回值来确定到底写进去多少,那些没有写进去的内容用户程序会缓存起来,后续会继续重试写入。
同样我们也注意到read buffer的内容可能会是空的。这样套接字的读操作(一般是读一个定长的字节数组)也会阻塞,直到read buffer中有了足够的内容(填充满字节数组)才会返回。有了NIO,就可以有多少读多少,无须阻塞了。读不够的,后续会继续尝试读取。
ack
那上面这张图就展现了套接字的全部过程么?显然不是,数据的确认过程(ack)就完全没有展现。比如当写缓冲的内容拷贝到网卡后,是不会立即从写缓冲中将这些拷贝的内容移除的,而要等待对方的ack过来之后才会移除。如果网络状况不好,ack迟迟不过来,写缓冲很快就会满的。
包头
细心的同学可能注意到图中的消息req被拷贝到网卡的时候变成了大写的REQ,这是为什么呢?因为这两个东西已经不是完全一样的了。内核的网络模块会将缓冲区的消息进行分块传输,如果缓冲区的内容太大,是会被拆分成多个独立的小消息包的。并且还要在每个消息包上附加上一些额外的头信息,比如源网卡地址和目标网卡地址、消息的序号等信息,到了接收端需要对这些消息包进行重新排序组装去头后才会扔进读缓冲中。这些复杂的细节过程就非常难以在动画上予以呈现了。
速率
还有个问题那就是如果读缓冲满了怎么办,网卡收到了对方的消息要怎么处理?一般的做法就是丢弃掉不给对方ack,对方如果发现ack迟迟没有来,就会重发消息。那缓冲为什么会满?是因为消息接收方处理的慢而发送方生产的消息太快了,这时候tcp协议就会有个动态窗口调整算法来限制发送方的发送速率,使得收发效率趋于匹配。如果是udp协议的话,消息一丢那就彻底丢了。
TCP协议的简单介绍
TCP是面向连接的运输层协议
简单来说,在使用TCP协议之前,必须先建立TCP连接,就是我们常说的三次握手。在数据传输完毕之后,必须是释放已经建立的TCP连接,否则会发生不可预知的问题,造成服务的不可用状态。
每一条TCP连接都是可靠连接,且只有两个端点
TCP连接是从Server端到Client端的点对点的,通过TCP传输数据,无差错,不重复不丢失。
TCP协议的通信是全双工的
TCP协议允许通信双方的应用程序在任何时候都能发送数据。TCP 连接的两端都设有发送缓冲区和接收缓冲区,用来临时存放双向通信的数据。发送数据时,应用程序把数据传送给TCP的缓冲后,就可以做自己的事情,而TCP在合适的时候将数据发送出去。在接收的时候,TCP把收到的数据放入接收缓冲区,上层应用在合适的时候读取数据。
TCP协议是面向字节流的
TCP中的流是指流入进程或者从进程中流出的字节序列。所以向Java,golang等高级语言在进行TCP通信是都需要将相应的实体序列化才能进行传输。还有就是在我们使用Redis做缓存的时候,都需要将放入Redis的数据序列化才可以,原因就是Redis底层就是实现的TCP协议。
TCP并不知道所传输的字节流的含义,TCP并不能保证接收方应用程序和发送方应用程序所发出的数据块具有对应大小的关系(这就是TCP传输过程中产生的粘包问题)。但是应用程序接收方最终受到的字节流与发送方发送的字节流是一定相同的。因此,我们在使用TCP协议的时候应该制定合理的粘包拆包策略。
下图是TCP的协议传输的整个过程:
TCP粘包问题复现
如下图所示,出现的粘包问题一共有三种情况
第一种情况:
如上图中的第一根bar所示,服务端一共读到两个数据包,每个数据包都是完成的,并没有发生粘包的问题,这种情况比较好处理,服务器只需要简单的从网络缓冲区去读就好了,每次服务端读取到的消息都是完成的,并不会出现数据不正确的情况。
第二种情况:
服务端仅收到一个数据包,这个数据包包含客户端发出的两条消息的完整信息,这个时候基于第一种情况的逻辑实现的服务端就蒙了,因为服务端并不能很好的处理这个数据包,甚至不能处理,这种情况其实就是TCP的粘包问题。
第三种情况:
服务端收到了两个数据包,第一个数据包只包含了第一条消息的一部分,第一条消息的后半部分和第二条消息都在第二个数据包中,或者是第一个数据包包含了第一条消息的完整信息和第二条消息的一部分信息,第二个数据包包含了第二条消息的剩下部分,这种情况其实是发送了TCP拆包问题,因为发生了一条消息被拆分在两个包里面发送了,同样上面的服务器逻辑对于这种情况是不好处理的。
为什么会发生TCP粘包、拆包
应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。
接收方法不及时读取套接字缓冲区数据,这将发生粘包。
如何处理粘包、拆包
通常会有以下一些常用的方法:
使用带消息头的协议、消息头存储消息开始标识及消息长度信息,服务端获取消息头的时候解析出消息长度,然后向后读取该长度的内容。
设置定长消息,服务端每次读取既定长度的内容作为一条完整消息,当消息不够长时,空位补上固定字符。
设置消息边界,服务端从网络流中按消息编辑分离出消息内容,一般使用‘\n’。
更为复杂的协议,例如楼主最近接触比较多的车联网协议808,809协议。
TCP粘包拆包的代码实践
下面代码楼主主要演示了使用规定消息头,消息体的方式来解决TCP的粘包,拆包问题。
server端代码: server端代码的主要逻辑是接收客户端发送过来的消息,重新组装出消息,并打印出来。
package net.package_together;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
public class TestSocketServer {
public static void main(String args[]) {
ServerSocket serverSocket;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8089));
while (true) {
Socket socket = serverSocket.accept();
new ReceiveThread(socket).start();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static class ReceiveThread extends Thread {
public static final int PACKET_HEAD_LENGTH = 2;//包头长度
private Socket socket;
private volatile byte[] bytes = new byte[0];
public ReceiveThread(Socket socket) {
this.socket = socket;
}
public byte[] mergebyte(byte[] a, byte[] b, int begin, int end) {
byte[] add = new byte[a.length + end - begin];
int i = 0;
for (i = 0; i < a.length; i++) {
add[i] = a[i];
}
for (int k = begin; k < end; k++, i++) {
add[i] = b[k];
}
return add;
}
@Override
public void run() {
int count = 0;
while (true) {
try {
InputStream reader = socket.getInputStream();
if (bytes.length < PACKET_HEAD_LENGTH) {
byte[] head = new byte[PACKET_HEAD_LENGTH - bytes.length];
int couter = reader.read(head);
if (couter < 0) {
continue;
}
bytes = mergebyte(bytes, head, 0, couter);
if (couter < PACKET_HEAD_LENGTH) {
continue;
}
}
// 下面这个值请注意,一定要取2长度的字节子数组作为报文长度,你懂得
byte[] temp = new byte[0];
temp = mergebyte(temp, bytes, 0, PACKET_HEAD_LENGTH);
String templength = new String(temp);
int bodylength = Integer.parseInt(templength);//包体长度
if (bytes.length - PACKET_HEAD_LENGTH < bodylength) {//不够一个包
byte[] body = new byte[bodylength + PACKET_HEAD_LENGTH - bytes.length];//剩下应该读的字节(凑一个包)
int couter = reader.read(body);
if (couter < 0) {
continue;
}
bytes = mergebyte(bytes, body, 0, couter);
if (couter < body.length) {
continue;
}
}
byte[] body = new byte[0];
body = mergebyte(body, bytes, PACKET_HEAD_LENGTH, bytes.length);
count++;
System.out.println("server receive body: " + count + new String(body));
bytes = new byte[0];
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
client端代码:客户端代码主要逻辑是组装要发送的消息,确定消息头,消息体,然后发送到服务端。
package net.package_together;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
public class TestSocketClient {
public static void main(String args[]) throws IOException {
Socket clientSocket = new Socket();
clientSocket.connect(new InetSocketAddress(8089));
new SendThread(clientSocket).start();
}
static class SendThread extends Thread {
Socket socket;
PrintWriter printWriter = null;
public SendThread(Socket socket) {
this.socket = socket;
try {
printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
String reqMessage = "HelloWorld! from clientsocket this is test half packages!";
for (int i = 0; i < 100; i++) {
sendPacket(reqMessage);
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void sendPacket(String message) {
try {
OutputStream writer = socket.getOutputStream();
writer.write(message.getBytes());
writer.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
有疑问加站长微信联系(非本文作者)