完整版分享-C++从0实现百万并发Reactor服务器

biancheng1 · · 295 次点击 · · 开始浏览    

# C++从0实现百万并发Reactor服务器 //xia仔ke:[百度网盘](https://www.51xuebc.com/thread-655-1-1.html) 单线程Reactor形式流程: clipboard.png ①效劳器端的Reactor是一个线程对象,该线程会启动事情循环,并运用Selector(选择器)来完成IO的多路复用。channel注册一个Acceptor事情处置器到Reactor中,Acceptor事情处置器所关注的事情是ACCEPT事情,这样Reactor会监听客户端向效劳器端发起的衔接恳求事情(ACCEPT事情)。 ②客户端向效劳器端发起一个衔接恳求,Reactor监听到了该ACCEPT事情的发作并将该ACCEPT事情派发给相应的Acceptor处置器来停止处置。Acceptor处置器经过accept()办法得到与这个客户端对应的衔接(SocketChannel),然后将该衔接所关注的READ事情以及对应的READ事情处置器注册到Reactor中,这样一来Reactor就会监听该衔接的READ事情了。 ③当Reactor监听到有读或者写事情发作时,将相关的事情派发给对应的处置器停止处置。比方,读处置器会经过SocketChannel的read()办法读取数据,此时read()操作能够直接读取到数据,而不会梗塞与等候可读的数据到来。 ④每当处置完一切就绪的感兴味的I/O事情后,Reactor线程会再次执行select()阻塞等候新的事情就绪并将其分派给对应处置器停止处置。 留意,Reactor的单线程形式的单线程主要是针关于I/O操作而言,也就是一切的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。 基于单线程反响器形式手写一个NIO通讯 先简单引见NIO中几个重要对象: Selector Selector的英文含义是“选择器”,也能够称为为“轮询代理器”、“事情订阅器”、“channel容器管理机”都行。 事情订阅和Channel管理: 应用程序将向Selector对象注册需求它关注的Channel,以及详细的某一个Channel会对哪些IO事情感兴味。Selector中也会维护一个“曾经注册的Channel”的容器。 Channels 通道,被树立的一个应用程序和操作系统交互事情、传送内容的渠道(留意是衔接到操作系统)。那么既然是和操作系统停止内容的传送,那么阐明应用程序能够经过通道读取数据,也能够经过通道向操作系统写数据。 一切被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。 ServerSocketChannel:应用效劳器程序的监听通道。只要经过这个通道,应用程序才干向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。 ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到 效劳器IP:端口的通讯衔接。 DatagramChannel:UDP 数据报文的监听通道。 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。 效劳端处置器: /** * 类阐明:nio通讯效劳端处置器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 结构办法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("效劳器已启动,端口号:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循环遍历selector while(started){ try{ //阻塞,只要当至少一个注册的事情发作的时分才会继续. selector.select(); Set keys = selector.selectedKeys(); Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //处置新接入的恳求音讯 if(key.isAcceptable()){ //取得关怀当前事情的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //经过ServerSocketChannel的accept创立SocketChannel实例 //完成该操作意味着完成TCP三次握手,TCP物理链路正式树立 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 树立衔接" ); //设置为非阻塞的 sc.configureBlocking(false); //衔接曾经完成了,能够开端关怀读事情了 sc.register(selector,SelectionKey.OP_READ); } //读音讯 if(key.isReadable()){ System.out.println("======socket channel 数据准备完成," + "能够去读==读取======="); SocketChannel sc = (SocketChannel) key.channel(); //创立ByteBuffer,并开拓一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取恳求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节停止编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //依据缓冲区可读字节数创立字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("效劳器收到音讯:" + message); //处置数据 String result = response(message) ; //发送应对音讯 doWrite(sc,result); } //链路曾经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送应对音讯 private void doWrite(SocketChannel channel,String response) throws IOException { //将音讯编码为字节数组 byte[] bytes = response.getBytes(); //依据数组容量创立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } } 客户端处置器: public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //创立选择器 selector = Selector.open(); //翻开通道 socketChannel = SocketChannel.open(); //假如为 true,则此通道将被置于阻塞形式; // 假如为 false,则此通道将被置于非阻塞形式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循环遍历selector while(started){ try { //阻塞,只要当至少一个注册的事情发作的时分才会继续 selector.select(); //获取当前有哪些事情能够运用 Set keys = selector.selectedKeys(); //转换为迭代器 Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //详细的事情处置办法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //取得关怀当前事情的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//衔接事情 if(sc.finishConnect()){} else{System.exit(1);} } //有数据可读事情 if(key.isReadable()){ //创立ByteBuffer,并开拓一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取恳求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节停止编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //依据缓冲区可读字节数创立字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送音讯 private void doWrite(SocketChannel channel,String request) throws IOException { //将音讯编码为字节数组 byte[] bytes = request.getBytes(); //依据数组容量创立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } private void doConnect() throws IOException { /*假如此通道处于非阻塞形式, 则调用此办法将启动非阻塞衔接操作。 假如立刻树立衔接,就像本地衔接可能发作的那样,则此办法返回true。 否则,此办法返回false, 稍后必需经过调用finishConnect办法完成衔接操作。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //衔接还未完成,所以注册衔接就绪事情,向selector表示关注这个事情 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //写数据对外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向效劳器发送音讯 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("请输入恳求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } } 效劳端过程: 启动效劳端,完成一些初始化工作,ServerSocketChannel绑定端口并且注册承受衔接事情. 循环里selector.select()阻塞,只要当至少一个注册的事情发作的时分才会继续,循环里面处置发作的注册事情 注册事情发作时交给处置器,若为承受衔接则accept取出socketChannel并完成衔接,然后就是关注read读取事情即注册,有数据读取了则处置器读取恳求数据并返回. 客户端过程: 启动客户端,完成一些初始化工作. 依据效劳端ip及端口发起衔接. 往效劳端发送数据,并注册read读取事情 循环里selector.select()阻塞,只要当至少一个注册的事情发作的时分才会继续,循环里面处置发作的注册事情. 注册事情发作时交给处置器,若为衔接事情并且衔接胜利则跳过即不予处置等候读取事情发送. 初始化工作如翻开selector,channel,设置通道形式能否阻塞. 单线程Reactor,工作者线程池 但在单线程Reactor形式中,不只I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上停止处置了,这可能会大大延迟I/O恳求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O恳求的响应. clipboard.png 添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样可以进步Reactor线程的I/O响应,不至于由于一些耗时的业务逻辑而延迟对后面I/O恳求的处置。 改良的版本中,所以的I/O操作照旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。 关于一些小容量应用场景,能够运用单线程模型。但是关于高负载、大并发或大数据量的应用场景却不适宜,主要缘由如下: ① 一个NIO线程同时处置成百上千的链路,性能上无法支撑,即使NIO线程的CPU负荷到达100%,也无法满足海量音讯的读取和发送; ②当NIO线程负载过重之后,处置速度将变慢,这会招致大量客户端衔接超时,超时之后常常会停止重发,这愈加重了NIO线程的负载,最终会招致大量音讯积压和处置超时,成为系统的性能瓶颈; 多Reactor线程形式 clipboard.png Reactor线程池中的每一Reactor线程都会有本人的Selector、线程和分发的事情循环逻辑。 mainReactor能够只要一个,但subReactor普通会有多个。mainReactor线程主要担任接纳客户端的衔接恳求,然后将接纳到的SocketChannel传送给subReactor,由subReactor来完成和客户端的通讯。 流程: ①注册一个Acceptor事情处置器到mainReactor中,Acceptor事情处置器所关注的事情是ACCEPT事情,这样mainReactor会监听客户端向效劳器端发起的衔接恳求事情(ACCEPT事情)。启动mainReactor的事情循环。 ②客户端向效劳器端发起一个衔接恳求,mainReactor监听到了该ACCEPT事情并将该ACCEPT事情派发给Acceptor处置器来停止处置。Acceptor处置器经过accept()办法得到与这个客户端对应的衔接(SocketChannel),然后将这个SocketChannel传送给subReactor线程池。 ③subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事情以及对应的READ事情处置器注册到subReactor线程中。当然你也注册WRITE事情以及WRITE事情处置器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有本人的Selector、线程和分发的循环逻辑。 ④当有I/O事情就绪时,相关的subReactor就将事情派发给响应的处置器处置。留意,这里subReactor线程只担任完成I/O的read()操作,在读取到数据后将业务逻辑的处置放入到线程池中完成,若完成业务逻辑后需求返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。 留意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)照旧还是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处置非I/O操作的逻辑。 多Reactor线程形式将“承受客户端的衔接恳求”和“与该客户端的通讯”分在了两个Reactor线程来完成。mainReactor完成接纳客户端衔接恳求的操作,它不担任与客户端的通讯,而是将树立好的衔接转交给subReactor线程来完成与客户端的通讯,这样一来就不会由于read()数据量太大而招致后面的客户端衔接恳求得不到即时处置的状况。并且多Reactor线程形式在海量的客户端并发恳求的状况下,还能够经过完成subReactor线程池来将海量的衔接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

295 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传