Docker组件go-event 源码学习

WayKwin · · 1169 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

go-event是一个在Docker项目中使用到的一个事件分发组件,实现了常规的广播,队列等事件分发模型,代码简洁明了,也适合初学者对Go语言的入门,对channel用来同步,通信也会加深理解。

核心数据结构

Event

type Event interface{}

Event被封装为一个空接口,接受任意类型。在go-events表示一个可以被分发的事件。

interface{}的底层类似于c语言中的void*,但比void*强大很多,比如interface{}保存了指向对象的指针和类型,而c程序员使用void*时,必须自己去保证对象的类型是正确的)

Sink

type Sink interface {
    Write(event Event) error
    Close() error
}

Sink是一个用来分发事件(Event)的结构。可以当作事件的处理者,使用接口的方式声明。只要对象实现了这两个方法,就可以被当作一个Sink。
核型方法

  • Write(event Event) error

    • 定义了事件如何被分发的策略。
  • Close() error

    • 当Sink被关闭的处理策略。

go-event核心就是围绕Sink做文章,docker官方给出了一个http的例子,就是当调用Write时,发起一次post请求。:

func (h *httpSink) Write(event Event) error {
    p, err := json.Marshal(event)
    if err != nil {
        return err
    }
    body := bytes.NewReader(p)
    resp, err := h.client.Post(h.url, "application/json", body)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.Status != 200 {
        return errors.New("unexpected status")
    }

    return nil
}

// implement (*httpSink).Close()

实现模型

到此为止,sink定义了事件分发的基本单位。在go-event中,封装了广播,消息队列两种消息分发的模型,具体来说,就是实现了Sink接口的两个结构体。

Boadcaster

type Broadcaster struct {
    sinks   []Sink //所包含的Sink
    events  chan Event// 同步Event的channel
    adds    chan configureRequest //adds和remove必须保证thread-safe,所以采用channel同步
    removes chan configureRequest

    shutdown chan struct{}
    closed   chan struct{}
    once     sync.Once
}

Boardcaster由多个Sink组成,当Boardcaster接收到一个事件时,会调用自身包含的所有Sink的Write()方法
go-events设计之初就实现协程之间的消息分发,需要保证thread-safe,所以对event的处理,添加,移除Sink都使用管道来通信。这也是Go的一个使用原则:

使用通信来共享内存,而不是通过共享内存来通信

在Broadcaster中所有的临界资源(sinks,event)都通过自身的run()函数统一管理,外界则通过相应的channel 同步给Broadcaster
例如Write()

func (b *Broadcaster) Write(event Event) error {
    select {
    case b.events <- event:
    case <-b.closed:
        return ErrSinkClosed
    }
    return nil
}

可以看到增减sink都是通过向对应的channel写入数据进行的。

func (b *Broadcaster) Add(sink Sink) error {
    return b.configure(b.adds, sink) //  will be block until ch can be writen
}

func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
    response := make(chan error, 1)

    for {
        select {
        case ch <- configureRequest{
            sink:     sink,
            response: response}:
            ch = nil // ?
        case err := <-response: 
            return err
        case <-b.closed:
            return ErrSinkClosed
        }
    }
}

核心run函数的实现,监听Boardcast管道上的相应事件,并作出处理。

func (b *Broadcaster) run() {
    defer close(b.closed)
    //将remove封装了一下,因为下面两处都会用到
    remove := func(target Sink) {
        for i, sink := range b.sinks {
            if sink == target {
                b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
                break
            }
        }
    }
    // 轮训处理channel上的事件
    for {
        select {
        case event := <-b.events: //有事件到来,进行广播
            for _, sink := range b.sinks {
                if err := sink.Write(event); err != nil {
                    if err == ErrSinkClosed {
                        // remove closed sinks
                        remove(sink)
                        continue
                    }
                    logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: dropping event")
                }
            }
        case request := <-b.adds: //增加sink事件
            // while we have to iterate for add/remove, common iteration for
            // send is faster against slice.

            var found bool
            for _, sink := range b.sinks {
                if request.sink == sink {
                    found = true
                    break
                }
            }

            if !found {
                b.sinks = append(b.sinks, request.sink)
            }
            // b.sinks[request.sink] = struct{}{}
            request.response <- nil // 唤醒阻塞的configure()函数
            
        case request := <-b.removes://删除sink事件
            remove(request.sink)
            request.response <- nil
        case <-b.shutdown:
            // close all the underlying sinks
            for _, sink := range b.sinks {
                if err := sink.Close(); err != nil && err != ErrSinkClosed {
                    logrus.WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: closing sink failed")
                }
            }
            return
        }
    }
}

queue

queue使用contaienr/list实现了典型的生产消费者模型

type Queue struct {
    dst    Sink
    events *list.List
    cond   *sync.Cond 
    mu     sync.Mutex
    closed bool
}

核心函数run(),在队列中取出下一个event,交给自身的sink处理,在没有事件队列的情况下,eq.next()总是阻塞的(使用条件变量进行同步)

func (eq *Queue) run() {
    for {
        event := eq.next()

        if event == nil {
            return // nil block means event queue is closed.
        }

        if err := eq.dst.Write(event); err != nil {
            logrus.WithFields(logrus.Fields{
                "event": event,
                "sink":  eq.dst,
            }).WithError(err).Debug("eventqueue: dropped event")
        }
    }
}

生产者:q.next()
消费者:write()

func (eq *Queue) Write(event Event) error {
    eq.mu.Lock()
    defer eq.mu.Unlock()

    if eq.closed {
        return ErrSinkClosed
    }

    eq.events.PushBack(event)
    eq.cond.Signal() // signal waiters

    return nil
}

func (eq *Queue) next() Event {
    eq.mu.Lock()
    defer eq.mu.Unlock()

    for eq.events.Len() < 1 {
        if eq.closed {
            eq.cond.Broadcast()
            return nil
        }

        eq.cond.Wait()
    }

    front := eq.events.Front()
    block := front.Value.(Event)
    eq.events.Remove(front)

    return block
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:WayKwin

查看原文:Docker组件go-event 源码学习

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1169 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传