go-event是一个在Docker项目中使用到的一个事件分发组件,实现了常规的广播,队列等事件分发模型,代码简洁明了,也适合初学者对Go语言的入门,对channel用来同步,通信也会加深理解。
核心数据结构
Event
type Event interface{}
Event被封装为一个空接口,接受任意类型。在go-events表示一个可以被分发的事件。
interface{}的底层类似于c语言中的void*,但比void*强大很多,比如interface{}保存了指向对象的指针和类型,而c程序员使用void*时,必须自己去保证对象的类型是正确的)
Sink
type Sink interface {
Write(event Event) error
Close() error
}
Sink是一个用来分发事件(Event)的结构。可以当作事件的处理者,使用接口的方式声明。只要对象实现了这两个方法,就可以被当作一个Sink。
核型方法
Write(event Event) error
- 定义了事件如何被分发的策略。
Close() error
- 当Sink被关闭的处理策略。
go-event核心就是围绕Sink做文章,docker官方给出了一个http的例子,就是当调用Write时,发起一次post请求。:
func (h *httpSink) Write(event Event) error {
p, err := json.Marshal(event)
if err != nil {
return err
}
body := bytes.NewReader(p)
resp, err := h.client.Post(h.url, "application/json", body)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.Status != 200 {
return errors.New("unexpected status")
}
return nil
}
// implement (*httpSink).Close()
实现模型
到此为止,sink定义了事件分发的基本单位。在go-event中,封装了广播,消息队列两种消息分发的模型,具体来说,就是实现了Sink接口的两个结构体。
Boadcaster
type Broadcaster struct {
sinks []Sink //所包含的Sink
events chan Event// 同步Event的channel
adds chan configureRequest //adds和remove必须保证thread-safe,所以采用channel同步
removes chan configureRequest
shutdown chan struct{}
closed chan struct{}
once sync.Once
}
Boardcaster由多个Sink组成,当Boardcaster接收到一个事件时,会调用自身包含的所有Sink的Write()方法
go-events设计之初就实现协程之间的消息分发,需要保证thread-safe,所以对event的处理,添加,移除Sink都使用管道来通信。这也是Go的一个使用原则:
使用通信来共享内存,而不是通过共享内存来通信
在Broadcaster中所有的临界资源(sinks,event)都通过自身的run()函数统一管理,外界则通过相应的channel 同步给Broadcaster
例如Write()
func (b *Broadcaster) Write(event Event) error {
select {
case b.events <- event:
case <-b.closed:
return ErrSinkClosed
}
return nil
}
可以看到增减sink都是通过向对应的channel写入数据进行的。
func (b *Broadcaster) Add(sink Sink) error {
return b.configure(b.adds, sink) // will be block until ch can be writen
}
func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
response := make(chan error, 1)
for {
select {
case ch <- configureRequest{
sink: sink,
response: response}:
ch = nil // ?
case err := <-response:
return err
case <-b.closed:
return ErrSinkClosed
}
}
}
核心run函数的实现,监听Boardcast管道上的相应事件,并作出处理。
func (b *Broadcaster) run() {
defer close(b.closed)
//将remove封装了一下,因为下面两处都会用到
remove := func(target Sink) {
for i, sink := range b.sinks {
if sink == target {
b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
break
}
}
}
// 轮训处理channel上的事件
for {
select {
case event := <-b.events: //有事件到来,进行广播
for _, sink := range b.sinks {
if err := sink.Write(event); err != nil {
if err == ErrSinkClosed {
// remove closed sinks
remove(sink)
continue
}
logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
Errorf("broadcaster: dropping event")
}
}
case request := <-b.adds: //增加sink事件
// while we have to iterate for add/remove, common iteration for
// send is faster against slice.
var found bool
for _, sink := range b.sinks {
if request.sink == sink {
found = true
break
}
}
if !found {
b.sinks = append(b.sinks, request.sink)
}
// b.sinks[request.sink] = struct{}{}
request.response <- nil // 唤醒阻塞的configure()函数
case request := <-b.removes://删除sink事件
remove(request.sink)
request.response <- nil
case <-b.shutdown:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil && err != ErrSinkClosed {
logrus.WithField("events.sink", sink).WithError(err).
Errorf("broadcaster: closing sink failed")
}
}
return
}
}
}
queue
queue使用contaienr/list实现了典型的生产消费者模型
type Queue struct {
dst Sink
events *list.List
cond *sync.Cond
mu sync.Mutex
closed bool
}
核心函数run(),在队列中取出下一个event,交给自身的sink处理,在没有事件队列的情况下,eq.next()总是阻塞的(使用条件变量进行同步)
func (eq *Queue) run() {
for {
event := eq.next()
if event == nil {
return // nil block means event queue is closed.
}
if err := eq.dst.Write(event); err != nil {
logrus.WithFields(logrus.Fields{
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
}
}
}
生产者:q.next()
消费者:write()
func (eq *Queue) Write(event Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return ErrSinkClosed
}
eq.events.PushBack(event)
eq.cond.Signal() // signal waiters
return nil
}
func (eq *Queue) next() Event {
eq.mu.Lock()
defer eq.mu.Unlock()
for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}
eq.cond.Wait()
}
front := eq.events.Front()
block := front.Value.(Event)
eq.events.Remove(front)
return block
}
有疑问加站长微信联系(非本文作者)