概述
消息系统通常都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不同的MQ实现的Broker实现会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。不管是生产者还是消费者对于Broker而言都是客户端,只不过一个是生产消息一个是消费消息。图2-1中生产者和消费者都是通过客户端请求的方式发送给服务端去执行存储消息或者获取消息的流程,在客户端和服务端这一层都有一个连接对象专门负责发送请求和接收请求,具体步骤如下:
生产者客户端应用程序产生消息
客户端连接对象将消息包装到请求中发送到服务端
服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来
服务端返回响应结果给生产者客户端
消费者客户端应用程序消费消息
客户端连接对象将消费信息也包装到请求中发送给服务端
服务端从文件存储系统中取出消息
服务端返回响应结果给消费者客户端
客户端将响应结果还原成消息并开始处理消息
图2-1 客户端和服务端交互
Kafka作为一个分布式的消息存储系统,生产者客户端需要将消息传给Kafka集群完成消息存储,本章从Kafka的消费者实现为入口,在源码分析的过程中,思考以下几个问题是如何实现的:
生产者是如何确保将消息以分布式的方式存储到Kafka集群?
生产者客户端是如何组织消息,发送消息,并接收服务端的响应?
客户端和服务端的通信机制,如何有效运用线程模型更高效地通信
本章的着重点主要在于客户端和服务端的网络通信流程,暂时还没有涉及到Kafka的服务端具体实现。因为对于任何的分布式系统而言,必须有一套负责不同节点之间数据传输的网络层通信机制,这套底层的框架要能够处理协议的编解码,客户端和服务端的请求发送和接收等等。在Java中的网络编程中最早是Socket模式,后来进化出了Selector选择器模式,再结合上队列模型,缓冲区机制,就可以设计出一套适合自己系统的网络层通信协议框架。虽然通信模型和服务端的架构实现上没有太大的关联,不过可以在这最底层的框架上添加一些额外的功能比如超时重试,序列化等功能,那么服务端就可以更专注地处理主体业务逻辑,而不需要花太多的精力去关注网络层的各种异常情况。
在分布式的系统中,协议是由服务端定制的,客户端只要遵循这种协议发送请求,服务端就可以确保可以正常地接收并处理客户端的请求。所以实际上客户端的实现可以由不同的语言自己去实现,官方的wiki中列出了目前已经支持的绝大多数语言。因为对于不同语言都有自己的网络层编程API,比如Golang使用channel通信,Akka使用Actor方式传递消息,它们就可以充分利用自己的语言特性去实现不同的客户端。
Kafka初期使用Scala编写,因此早期scala版本的producer、consumer和服务端的实现都放在core包下,最新的客户端使用了Java重新实现,被放在了clients包下。本章我们主要分析如下几个部分的内容:
新版本的Producer客户端实现(Java)
旧版本的Producer客户端实现(Scala)
服务端的网络连接实现(SocketServer)
双端队列InFlightRequests
队列
图2-32是记录收集器的batches队列和NetworkClient的inFlightRequests队列的对比,记录收集器双端队列中的元素只保存数据,没有状态信息,所以针对这个队列的操作只是简单地追加到队列最后一个,取出时取的是队列第一个元素。而inFlightRequests队列中的元素是客户端请求对象,它是有状态的,比如这个请求是否已经发送完成就是一种状态。请求发送完成并不代表就可以从队列中移除,不过如果客户端不需要响应结果发送完成则是可以删除的。
图2-32 InFlightRequests双端队列
实际上如果客户端请求添加到队列尾部也是可以的,如图2-33只不过对应的peek和poll的顺序都要做出改变:
图2-33 添加最新元素到双端队列的两种方式
图2-34中以新请求添加到队列头部为例,模拟了多个请求是如何加入队列以及完成时如何从队列中移除,其中[r1,r2,r3]需要响应结果,而r4不需要响应结果,假设[r1-r4]四个请求都属于某个节点,所以客户端会按照顺序依次加入到队列中。不过后一个请求必须要保证前一个请求发送到服务端节点后才可以进入队列等待发送,当收到响应请求完成时,r4是从队列头部被删除,而其他请求则是从队列尾部删除。
图2-34 双端队列操作
客户端请求的生命周期
客户端在和服务端某个节点建立连接时,会根据客户端中目前的请求队列判断第一个请求是否已经完成来判断这个节点是否可以发送更多的请求canSendMore。那么客户端请求什么时候才算是completed?注意虽然队列中保存的是ClientRequest,不过在add和peek时都是取出ClientRequest里的RequestSend对象。RequestSend到Send的继承体系是RequestSend->NetworkSend->ByteBufferSend->Send。对于ByteBufferSend而言完成的条件是没有要发送的数据了,即缓冲区中的数据都写完了。所以这里请求完成指的是当前发送请求已经成功发送到服务端了,但并不需要等待这个请求收到响应结果。
即使在同一个目标节点的同一个队列中,多个不同ClientRequest请求也是有顺序的,在前面的分析中已经有两个地方限制了客户端请求并不是可以随便添加到队列中的:
在准备连接时,queue.peekFirst().request().completed()=true
可以连接发送请求后,KafkaChannelsetSend也要确保send!=null,一个KafkaChannel只允许同时运行一个Send
其中第二个条件也将直接影响第一个条件,如果第一个请求没有发送完毕,它还会存在于KafkaChannel中,此时来了第二个请求,如果不加以限制即使send!=null,也要将第二个请求设置到KafkaChannel中,这样第一个请求返回的时候却返回了第二个请求,因为Send已经被第二个请求更新了,所以这是有问题的。
不过ClientRequest.RequestSend完成,并不表示这个ClientRequest在NetworkClient中就完成了,客户端的请求被发送到服务端,还需要等待收到服务端的响应结果。所以inFlightRequests表示正在进行中还没有完成的请求,下面几种场景都表示还没有完成的ClientRequest:ClientRequest请求等待发送,ClientRequest请求正在发送,ClientRequest请求已经发送(这时RequestSend完成),ClientRequest对应的请求还未收到响应结果,图2-35是ClientRequest在InFlightRequests中的生命周期。
图2-35 客户端请求在队列中的生命周期
客户端请求发送和接收示例
我们从发送线程开始,举例多个请求的发送和接收,以及在队列中的操作。发送线程第一次运行在准备工作时选择readyNodes,然后为已经准备好的节点创建连接和客户端请求ClientRequest,调用NetworkClient.send会将请求先加入请求对应的目标节点的队列中,然后设置到KafkaChannel中,每个KafkaChannel只有一个正在进行中的Send,如果已经存在Send(比如正在进行中的客户端请求没有被发送完成就不会被重置为空)则不允许再次调用。当选择器轮询时会将选择到的KafkaChannel中的Send通过底层的SocketChannel发送给服务端。图2-36模拟了第一个请求加入队列后的工作过程。
图2-36 NetworkClient.send包括入队列和调用Selector.send
假设第一个请求还没有发送完成比如还在步骤2/3时,发送线程第二次运行准备发送第二批数据(假设这两个请求都是要发送到同一个目标节点),由于队列中的第一个请求还没有完成,canSendMore返回false,在准备工作时就会将其从readyNodes中移除,这样就不会为这个节点创建新的ClientRequest,即第二个请求根本就不会被生成。即使没有canSendMore这一层判断,假设创建了第二个请求,当准备调用NetworkClient.send时,可是又遇到了第二个拦路虎,因为KafkaChannel.setSend要求send不能为空时才可以设置,而现在send已经被第一个客户端请求占着不放,还没有重置,所以客户端请求还是无法被成功地设置。这样就存在一个问题,请求已经被添加到队列中,但是却没办法设置到KafkaChannel中,只能等下次再调用一次NetworkClient.send,不过这样请求队列针对同一个请求就被加入多次了,所以能够尽早在第一道门框拦下第二个请求就不要在放进来了。所以新的请求被创建的时机必须等到队列头部第一个请求已经完成才会创建,而且此时第一个请求在完成的时候就设置了send=null,新创建的请求也可以被成功地设置到KafkaChannel中,所以说如果第一个条件满足(canSendMore=true)后通常第二个条件也是满足(send=null),图2-37是请求[R2,R3]分别在每次允许加入队列时加入到队列头部,图2-38是不需要响应结果的请求R3从队列头部删除请求,图2-39是需要响应结果的请求[R1,R2,R4]分别收到响应结果后从队列尾部删除请求。
图2-37 往队列中添加一个新的请求必须确保上一个请求已经完成
图2-38 不需要响应结果的请求发送完成从队列头部删除
图2-39 需要响应结果的请求收到响应后从队列尾部删除
排队的示例
这里的双端队列和现实世界的排队方式是类似的,如图2-40以去银行办理业务为例,排队机给每个人一个号码表示ClientRequest请求的顺序,只有上一个号码的人办理完了业务,下一个人才能办理。为了和这里的NetworkClient语义相同,我们稍微修改下排队规则,假设办理业务分成三个步骤:告诉业务员要办理什么业务,业务员处理业务,业务员完成业务,这些步骤都是可以并行执行的,而且执行完一个小步骤都要回到自己的座位上继续等待,假设只有一个业务办理窗口时(不过其实你不用担心,假设这个业务员只是一个入口而已,他的后台即服务端是开着很多线程在处理的)。第一个人开始办理业务时首先加入到inFlightRequests,并告诉业务员要取钱,业务员收到指令后,记录了这个信息(可以把业务员看做专门负责接收业务指令,但是不办理具体的业务),第一个人回到自己座位,他还不能离开大厅,因为他只是传达了这个指定,但是钱还没取到;因为此时队列中的第一人已经完成了发送指令请求,第二个人可以办理了,同样先加入到inFlightRequests队列中,然后第二个人说要改密码,业务员收到指令后同样不真正执行改密码的命令,但是如果这个时候第三个人等不住了,还没等第二个人传达完指令就想强行插队,对不起,请稍等下!所以inFlightRequest表示已经发送完请求,或者正在发送请求的,但是他们都还不能离开大厅,因为还没有收到响应结果。因为每个请求发送给业务员都是有顺序的,所以加入到inFlightRequest中的ClientRequest也都是有顺序的,这个队列是个双端队列,队列头部是最近加入的请求,队列尾部是最早加入的请求,如果队列第一个元素的请求还没有发送完成,不允许下一个请求加入队列中,所以新加入队列的元素,在这之前的请求一定都是已经发送完成了,否则他就不可能被加入队列中了。
图2-40 银行办理业务与队列
图2-40中虽然符合新请求添加到队列头部(我们把尾部设置为面对业务员),按照排队的方式理解起来也比较直观,第一个请求先于第二个请求被处理,不过似乎业务员总是面对着第一个请求。为了更好地理解这个双端队列图2-41中分成两个队列,排队队列负责接收请求,处理队列负责处理收到的请求,请求按照发送顺序加入排队队列,一旦请求发送完毕,业务员就会把收到的请求放入另一个队列中,这样两种队列其实都满足了排队论。不过双端队列本来就可以在头尾同时操作,所以实际上只需要一个队列即可。
图2-41 排队队列和处理队列
现在如果从一个业务窗口推到多个窗口,如图2-42就类似于客户端可以向多个服务端节点同时发送请求,每个服务端目标节点都有一个双端队列,每个队列的处理方式和上面一个窗口都是类似的,只不过现在每个请求都携带了自己将会被排队到指定窗口。
图2-42 多个窗口的队列
假设第一个人的业务被成功受理,并且也成功取到钱了,他就可以拿着钱开开心心地离开银行大厅了,现在他的业务已经全部办理好了,就会从inFlightRequests中移除了,因为inFlightRequests中保存的是发送完或正在发送请求,但是没有收到响应结果,一旦收到响应结果就不应该继续在大厅里呆下去了,毕竟inFlightRequests的容量也是有限制的,如果银行大厅座位都做满了,说明请求量太大了,所以取完钱就赶紧回家。
对于需要响应的请求,请求在服务端慢吞吞地处理,返回也是有顺序的,也是说服务端是按照客户端请求的顺序处理的,只有第一个请求返回后才会接着返回第二个请求结果,并不会出现第二个请求先于第一个请求返回结果给客户端。所以对于不需要响应结果的客户端请求如果在handleCompletedSends中没有删除而是等到handleCompletedReceives才删除显然是不公平的,因为他本来可以立即返回,但是却要等到他前面的人都收到结果后才能轮到他。比如超市通常会设置无购物快速通道,如果顾客没有购买任何东西不需要在购物通道上排队就可以快速出去。
如果客户端请求不需要响应就会像上面那样,在发送完就被清理掉了,这是因为客户端既然不想要响应结果,那么就让请求越快完成越好。通过这种快速清理的方式确保了下一个请求进来之前,保存在队列中的一定都是需要响应的:因为上一个请求是不需要响应的,那么在下一个请求加入队列头部之前,上一个请求已经从队列头部移除掉了。以超市为例,进入购物通道排队的人一定知道排队的人都是有购物的,没有人那么傻没有买任何东西却还傻傻地在排队。同样以银行办理业务为例,假设有些人是来咨询业务的,业务员是立即可以回答的,不需要和后台的服务端交互(或者尽管有交互,但是客户端并不关心这个结果,在你出来结果之后,他可能已经都走了)。这样的客户端请求也要排队,在准备发送请求时加入队列头部第一个元素,完成时就可以立即从队列头部移除,不需要进入处理队列了。
现在Java版本的生产者客户端已经分析完毕,表2-4总结了客户端发送过程涉及到的主要组件和其用途:
表2-4 Java版本的生产者主要组件
本章总结
本章主要分析了两种版本的生产者客户端以及服务端的网络层实现,重点介绍了客户端的NetworkClient和服务端的SocketServer,Java版本的客户端和服务端的Processor都使用了Selector选择器模式和KafkaChannel,而Scala版本的客户端则使用比较原始的BlockingChannel。在客户端服务端的通信模型中,通常一个客户端会连接到多个服务端,一个服务端也会接受多个客户端的连接,所以使用Selector模式可以使得网络通信更加高效,在服务端还运用了Reactor模式将IO部分和业务处理部分的线程进行分离。除此之外,客户端和服务端在很多地方都运用了队列这种数据结构来对请求或者响应进行排队,队列是一种保证数据被有序地处理并且能够缓存的结构。表2-8总结了Scala版本的生产者客户端和服务端中使用队列的地方,这里并不包括Java版本的生产者使用更高级的双端队列。
在客户端要向服务端发送消息时我们会获取Cluster集群状态(Java版本)/集群元数据TopicMetadata(Scala版本),为消息选择Partition,选择Partition的Leader作为目标节点,在服务端SocketServer会接收客户端发送的请求交给Handler和KafkaApis处理,具体和消息相关的处理逻辑由KafkaApis以及KafkaServer中的其他组件一起完成。
图2-57是Kafka服务端的内部组件图,网络层包括一个Acceptor线程和多个Processor线程;API层的多个API线程指的是多个KafkaRequestHandler线程,网络层和API层中间有一个RequestChannel,它是请求和响应的数据交换中转站;API层和日志子系统有关联因为API层的请求要读取或写入日志文件,Replication子系统主要的管理类是ReplicaManager,而KafkaApis和它有直接的关联;一个KafkaBroker和其他Broker以及依赖的ZK也有关联,这些关联系统在后续的章节中都会分析到。
图2-57 KafkaBroker的内部组件
图片引自:https://cwiki.apache.org/confluence/display/KAFKA/Index
本章分析的Producer包括后面要分析的Consumer都不是作为Kafka的内置服务,而是一种客户端(所以它们都在clients包),客户端可以独立于Kafka集群,因此开发客户端应用程序时只需要提供一个Kafka集群的地址即可,说明客户端可以和Kafka集群独立开来,图2-58展示了一种典型的生产者、消费者和Kafka集群交互方式,其中Kafka集群还会和ZooKeeper互相通信。
图2-58 生产者、消费者、Kafka集群交互
客户端有发送和接收请求,服务端同样也有接收和发送的逻辑,因为对于I/O来说是双向的:客户端发送请求,就意味着服务端要接收请求,同样服务端对请求作出响应并发送响应结果给客户端,客户端就要接收响应。接下来我们会分析客户端发送的请求在服务端是怎么被KafkaApis处理的。
来源:zqhxuyuan.github.io
原文:http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/#%E7%AC%AC%E4%BA%8C%E7%AB%A0_%E7%94%9F%E4%BA%A7%E8%80%85
有疑问加站长微信联系(非本文作者)