[CatServer 服务器框架] Group 新调度实现

RuiCat · · 531 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

# 调度包实现 ## 实现 ### 调度定义 > 采用 生产消费 的处理方式 > > 1. 创建多个消费者 > 2. 将生产的值发送给消费者 > 3. 在关闭消费者的情况下挂起生产者 > 4. 在无法消费的情况下储存到缓冲队列,并通知回调函数. > 5. 关闭调度时等待缓冲队列与剩余调度执行完成. > > 注: > > >1. 超时回调会挂起调度 > >2. 在回调中调用 Close/SetWorker 需要携程运行 #### 1. 创建消费者 > 创建一个缓冲为 n 的消费者通道 ```go var consumer chan interface{} // 消费者 consumer = make(chan interface{}, n) ``` #### 2. 创建生产者 ```go var producer chan interface{} // 生产者 producer = make(chan interface{}) ``` #### 3. 生产消费 > 将生产的值发送到消费者 ``` go consumer <- <- producer ``` #### 4. 对消费添加超时处理 > 单纯的将生产的值发送给消费者没有任何意义,所以添加 t 超时控制. ```go for v = range producer { // 退出生产者调度 if v == nil { break } // 处理生产 select { case consumer <- v: // 转发消费 case <-t.C: // 消费超时 // 超时处理 } } ``` #### 5. 创建缓冲队列 > 创建缓冲队列 ```go var value []interface{} ``` > 尝试重新发超时值,无法重新发情况下添加到超时列表,并通知 CallTime 回调函数. ```go select { case consumer <- v: default: value = append(value, v) go CallTime() } ``` #### 6. 退出处理 > 在调度队列退出的情况下,处理缓冲队列值 ``` go // 处理剩余 if len(value) > 0 { for v = range value { Call(v) } value = value[0:0] } ``` > 退出调度 ``` go ticker.Stop() // 处理超时过程 producer <- nil // 关闭生产者 close(g.consumer) // 关闭消费者 ``` ## 完整实现 ``` go package main import ( "fmt" "sync" "time" ) // Group 调度器 type Group struct { consumer chan interface{} // 消费者 producer chan interface{} // 生产者 value []interface{} // 溢出表 wait sync.WaitGroup // 退出等待 ticker *time.Ticker // 消费超时 // 用户参数 Interval int64 // 超时时间 Worker int // 消费者数量 Call func(v interface{}) // 处理函数 CallTime func() // 超时回调 IsStart bool // 启动标记 } // Init 初始化 func (g *Group) Init() { g.producer = make(chan interface{}) } // SetWorker 设置消费者数量 func (g *Group) SetWorker(Worker int) { // 直接设置值 g.Worker = Worker // 如果在启动的情况下再次启动 if g.IsStart { g.Start() } } // SetInterval 设置超时 func (g *Group) SetInterval(t int64) { // 设置超时 g.Interval = t // 是否启动 if g.IsStart { // 关闭定时器 g.ticker.Stop() } } // Start 启动 func (g *Group) Start() { // 是否启动 if g.IsStart { g.Close() } g.IsStart = true // 重新设置超时 g.ticker = time.NewTicker(time.Second * time.Duration(g.Interval)) g.consumer = make(chan interface{}, g.Worker) // 调度过程 for i := 0; i < g.Worker; i++ { go func() { g.wait.Add(1) defer g.wait.Done() var v interface{} for v = range g.consumer { g.Call(v) } }() } // 转发过程 go func() { g.wait.Add(1) defer g.wait.Done() // 处理 var v interface{} var ok bool for v = range g.producer { // 退出生产者调度 if v == nil { break } // 处理生产 select { case g.consumer <- v: // 转发消费 case _, ok = <-g.ticker.C: // 消费超时 // 关闭定时 if !ok { // 重新启动定时 g.ticker = time.NewTicker(time.Second * time.Duration(g.Interval)) } select { case g.consumer <- v: // 重新尝试写 default: // 无法写进行记录 g.value = append(g.value, v) g.CallTime() } } } // 处理剩余 if len(g.value) > 0 { for v = range g.value { g.Call(v) } g.value = g.value[0:0] } }() } // Close 退出 func (g *Group) Close() { g.IsStart = false // 是否启动 g.ticker.Stop() // 处理超时过程 g.producer <- nil // 关闭生产者 close(g.consumer) // 关闭消费者 g.wait.Wait() // 等待消费完成 } // Go 添加一个值 func (g *Group) Go(v interface{}) { if v != nil { g.producer <- v } } func main() { var g *Group n := 2 g = &Group{ Call: func(v interface{}) { <-time.NewTicker(time.Second).C // 模拟任务延时 fmt.Println("处理值: ", v) }, CallTime: func() { n = n + 2 fmt.Println("重新设置调度:", n) go g.SetWorker(n) // 调度数量 fmt.Println("重新设置完成") }, } g.Init() // 初始化 g.SetWorker(n) // 调度数量 g.SetInterval(1) // 超时一秒 // 开始处理 g.Start() // 添加值 for i := 0; i < 10; i++ { g.Go(i) } // 等待关闭 g.Close() } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

531 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传