Golang环状队列的实现与应用

路上阳光 · · 850 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

一个基于数组的环状队列的实现

队列概念上很简单,就是一个FIFO的数据结构,通常使用链表或者数组来实现,队列在使用方式上只允许从头里面插入,然后从尾部读取,插入和读取的时间复杂度都是0(1)

队列的数据结构

type RingGrowing struct {
    data        []interface{}
    n           int // 初始队列的初始长度
    beg         int // First available element
    readable    int // Number of data items available
}

队列的读取和写入操作

func (r *RingGrowing) ReadOne()  (data interface{}, ok bool) {
    if r.readable == 0 {
        return nil, false       //如果当前没有通知消息, 则返回nil
    }
    r.readable--
    element := r.data[r.beg]
    r.data[r.beg] = nil // Remove reference to the object to help GC
    if r.beg == r.n -1 {
        r.beg = 0
    }else {
        r.beg ++
    }
    return element, true
}


func (r *RingGrowing) WriteOne (data interface{}) {
    if r.readable == r.n {      // 如果已经满了就自动扩容
        newN := r.n * 2
        newData := make([]interface{}, newN)
        to := r.beg + r.readable
        if to <= r.n {                          // 如果当前并没有写满过, 就直接拷贝数据
            copy(newData, r.data[r.beg:to])
        } else {
            // 当数据超过r.n就会分为两个区间: [:to%r.n][] [r.beg:]
            copied := copy(newData, r.data[r.beg:])
            copy(newData[copied:], r.data[:to%r.n])
        }
        r.beg = 0
        r.data = newData
        r.n = newN
    }
    r.data[(r.readable+r.beg)%r.n] = data
    r.readable++
}

应用场景

listener首先注册消息的回调
生产者调用listener写入消息
接受到消息后调用handler进行处理

listener定义与实现

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}

    handler *ResourceHandler
    pendingNotifications *RingGrowing
}


func newProcessLitener (bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                 make(chan interface{}),
        addCh:                  make(chan interface{}),
        handler:                &ResourceHandler{},
        pendingNotifications:   NewRingGrowing(bufferSize),
    }
    return ret
}


func (p *processorListener) pop() {
    defer close(p.nextCh)
    var nextCh chan <- interface{}
    var notification interface{}

    for {
        select {
        case nextCh <- notification:
            // 如果notification不为空, 则就吧数据放入到nextCh中
            // 同时再次从pendingNotifications中读取, 如果没有数据则notification设置为nil
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            println("Pop next notification", notification, ok, nextCh, p.nextCh)
            if !ok {
                nextCh = nil
            }
        case notificationAdd, ok :=  <- p.addCh:
            println("Pop:Get notification from addCh", notificationAdd, ok, notification == nil)
            if !ok {
                return
            }
            // 当从pendingNotifications中读取不到数据后,notification会设置为空,
            // 则当添加事件的时候, 就会重新设置notification
            if notification == nil {
                notification = notificationAdd
                nextCh = p.nextCh
            } else {
                p.pendingNotifications.WriteOne(notificationAdd)
            }
        }
    }
}

// run函数负责从环状队列里面获取数据,然后调用注册的回调handler来进行处理
func (p *processorListener) run () {
    println("Start listener run")
    stopCh := make(chan struct{})

    wait.Until(func () {
        for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
                p.handler.onUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.onAdd(notification.newObj)
            case deleteNotification:
                p.handler.onDel(notification.oldObj)
            }
        }
        close(stopCh)
    }, 1* time.Second, stopCh)
}

hanlder的简单定义

// 实际上handler可以抽象成一个接口,然后具体实现呦真正的操作来进行实现
type ResourceHandler struct {
}

func (r *ResourceHandler) onUpdate (newObj, oldObj interface{}) {
    println("Update Handler", newObj, oldObj)
}

func (r *ResourceHandler) onAdd (newObj interface{}) {
    println("Add Handler", newObj)
}

func (r *ResourceHandler) onDel (oldObj interface{}) {
    println("Del Handler", oldObj)
}

简单生产者的实现

// 定义三类不同的通知类型,然后回调的时候分别调用对应的实现
type updateNotification struct {
    newObj  interface{}
    oldObj  interface{}
}

type addNotification struct {
    newObj  interface{}
}

type deleteNotification struct {
    oldObj  interface{}
}
go wait.Until(func() {
    number := rand.Intn(100)
    println(number)
    var notification interface{}
    if number < 20 {
        notification = addNotification{newObj: number}
    } else if number < 50 {
        notification = updateNotification{newObj:number, oldObj:number}
    } else {
        notification = deleteNotification{oldObj: number}
    }
    listener.add(notification)
}, 1*time.Second, stopCh)

总结

上面的代码取自k8s里面的share_informer里面,这部分逻辑主要是实现,当watch到事件变化后加入到FIFO队列后,sharedIndexInformer里面的HandleDeltas发现变换后,生成对应的事件,然后吧事件传回到之前定义的processlitener里面,然后就由上面的processorListener进行后续的处理

学习到的地方,一个是环状队列的生产实现,事件、事件处理函数、环状队列之间的解耦与组合,嗨呦select拿精妙的用法


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:路上阳光

查看原文:Golang环状队列的实现与应用

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

850 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传