一个基于数组的环状队列的实现
队列概念上很简单,就是一个FIFO的数据结构,通常使用链表或者数组来实现,队列在使用方式上只允许从头里面插入,然后从尾部读取,插入和读取的时间复杂度都是0(1)
队列的数据结构
type RingGrowing struct {
data []interface{}
n int // 初始队列的初始长度
beg int // First available element
readable int // Number of data items available
}
队列的读取和写入操作
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
if r.readable == 0 {
return nil, false //如果当前没有通知消息, 则返回nil
}
r.readable--
element := r.data[r.beg]
r.data[r.beg] = nil // Remove reference to the object to help GC
if r.beg == r.n -1 {
r.beg = 0
}else {
r.beg ++
}
return element, true
}
func (r *RingGrowing) WriteOne (data interface{}) {
if r.readable == r.n { // 如果已经满了就自动扩容
newN := r.n * 2
newData := make([]interface{}, newN)
to := r.beg + r.readable
if to <= r.n { // 如果当前并没有写满过, 就直接拷贝数据
copy(newData, r.data[r.beg:to])
} else {
// 当数据超过r.n就会分为两个区间: [:to%r.n][] [r.beg:]
copied := copy(newData, r.data[r.beg:])
copy(newData[copied:], r.data[:to%r.n])
}
r.beg = 0
r.data = newData
r.n = newN
}
r.data[(r.readable+r.beg)%r.n] = data
r.readable++
}
应用场景
listener首先注册消息的回调
生产者调用listener写入消息
接受到消息后调用handler进行处理
listener定义与实现
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler *ResourceHandler
pendingNotifications *RingGrowing
}
func newProcessLitener (bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: &ResourceHandler{},
pendingNotifications: NewRingGrowing(bufferSize),
}
return ret
}
func (p *processorListener) pop() {
defer close(p.nextCh)
var nextCh chan <- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// 如果notification不为空, 则就吧数据放入到nextCh中
// 同时再次从pendingNotifications中读取, 如果没有数据则notification设置为nil
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
println("Pop next notification", notification, ok, nextCh, p.nextCh)
if !ok {
nextCh = nil
}
case notificationAdd, ok := <- p.addCh:
println("Pop:Get notification from addCh", notificationAdd, ok, notification == nil)
if !ok {
return
}
// 当从pendingNotifications中读取不到数据后,notification会设置为空,
// 则当添加事件的时候, 就会重新设置notification
if notification == nil {
notification = notificationAdd
nextCh = p.nextCh
} else {
p.pendingNotifications.WriteOne(notificationAdd)
}
}
}
}
// run函数负责从环状队列里面获取数据,然后调用注册的回调handler来进行处理
func (p *processorListener) run () {
println("Start listener run")
stopCh := make(chan struct{})
wait.Until(func () {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.onUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.onAdd(notification.newObj)
case deleteNotification:
p.handler.onDel(notification.oldObj)
}
}
close(stopCh)
}, 1* time.Second, stopCh)
}
hanlder的简单定义
// 实际上handler可以抽象成一个接口,然后具体实现呦真正的操作来进行实现
type ResourceHandler struct {
}
func (r *ResourceHandler) onUpdate (newObj, oldObj interface{}) {
println("Update Handler", newObj, oldObj)
}
func (r *ResourceHandler) onAdd (newObj interface{}) {
println("Add Handler", newObj)
}
func (r *ResourceHandler) onDel (oldObj interface{}) {
println("Del Handler", oldObj)
}
简单生产者的实现
// 定义三类不同的通知类型,然后回调的时候分别调用对应的实现
type updateNotification struct {
newObj interface{}
oldObj interface{}
}
type addNotification struct {
newObj interface{}
}
type deleteNotification struct {
oldObj interface{}
}
go wait.Until(func() {
number := rand.Intn(100)
println(number)
var notification interface{}
if number < 20 {
notification = addNotification{newObj: number}
} else if number < 50 {
notification = updateNotification{newObj:number, oldObj:number}
} else {
notification = deleteNotification{oldObj: number}
}
listener.add(notification)
}, 1*time.Second, stopCh)
总结
上面的代码取自k8s里面的share_informer里面,这部分逻辑主要是实现,当watch到事件变化后加入到FIFO队列后,sharedIndexInformer里面的HandleDeltas发现变换后,生成对应的事件,然后吧事件传回到之前定义的processlitener里面,然后就由上面的processorListener进行后续的处理
学习到的地方,一个是环状队列的生产实现,事件、事件处理函数、环状队列之间的解耦与组合,嗨呦select拿精妙的用法
有疑问加站长微信联系(非本文作者)