go-disruptor 高性能的消息框架 go-disruptor

agolangf • 3681 次点击    
这是一个分享于 的项目,其中的信息可能已经有所发展或是发生改变。
这是Go编程语言里 LMAX Disruptor的接口。 它保留了Disruptor的本质和原理,并利用了很多相同的抽象概念和理论,但不会保持同样的API。 ** 简述: ** 在我的 MacBook Pro (Intel Core i7-4960HQ CPU @ 2.60GHz) 中,我使用了 Go 1.4.2, 此版本使我能在一秒内发送9亿多份邮件(是的,你没有听错), 从一个goroutine到另一个goroutine. 讯息在两台CPU间的传递很简单。 请注意,您的里程可能会有所不同,通过控制CPU并清除其缓存,不同的操作系统可以添加特定的“jitter”到App中。Linux和Windows系统有给定的进程分配给特定的CPU内核它通过将所有的CPU缓存热显著降低“jitter”的能力。 顺便,当Disruptor代码被编译并在Nexus 5上运行,它可以每秒可推送约15-20万条信息。 一旦被初始化,在运行时,Disruptor杰出设计的考虑因素之一,就是以一个恒定的速率来处理消息。为此,它使用两个主要技术: 1. 它避免了在所有costs上使用锁,costs通常会引起CPU内核间的排斥,影响可测量性。 2. 它允许应用程序预先在一个环形缓冲区分配连续的空间,不产生垃圾。  通过避免垃圾,垃圾清理站和应用程序暂停的功能可以免去。 **示例代码:** Wireup **** <pre class="brush:cpp ;toolbar: true; auto-links: false;">runtime.GOMAXPROCS(2) // make sure we have enough cores available to execute const RingBufferCapacity = 1024 // must be a power of 2 const RingBufferMask = RingBufferCapacity - 1 // this instance will be shared among producers and consumers of this application var ringBuffer = [RingBufferCapacity]MyStruct{} myDisruptor := disruptor.     Configure(RingBufferCapacity).     WithConsumerGroup(MyConsumer{}). // we can have a set of concurrent consumers run first     // WithConsumerGroup(MyConsumer{}). // and then run this/these consumers after the first set of consumers     BuildShared() // Build() = single producer vs BuildShared() = multiple producers myDisruptor.Start() defer myDisruptor.Stop() // clean shutdown which stops all idling consumers after all published items have been consumed // application code here, e.g. listen to HTTP, read from a network socket, etc.</pre> 生产者 <pre class="brush:cpp ;toolbar: true; auto-links: false;">Producer writer := myDisruptor.Writer() // for each item received from a network socket, e.g. UDP packets, HTTP request, etc. etc. sequence := writer.Reserve(1) // reserve 1 slot on the ring buffer and give me the upper-most sequence of the reservation // this could be written like this: ringBuffer[sequence%RingBufferCapacity] but the Mask and &amp; operator is faster. ringBuffer[sequence&amp;RingBufferMask].MyImportStructData = ... // data from network stream writer.Commit(sequence, sequence) // the item is ready to be consumed</pre> 消费者 <pre class="brush:cpp ;toolbar: true; auto-links: false;">type MyConsumer struct{} func (m MyConsumer) Consume(lowerSequence, upperSequence int64) {     for sequence := lowerSequence; sequence &lt;= upperSequence; sequence++ {         message := ringBuffer[sequence&amp;RingBufferMask] // see performance note on producer sample above         // handle the incoming message with your application code     } }</pre>
授权协议:
Apache
开发语言:
Google Go 查看源码»
操作系统:
跨平台
3681 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传