# 调度包实现
## 实现
### 调度定义
> 采用 生产消费 的处理方式
>
> 1. 创建多个消费者
> 2. 将生产的值发送给消费者
> 3. 在关闭消费者的情况下挂起生产者
> 4. 在无法消费的情况下储存到缓冲队列,并通知回调函数.
> 5. 关闭调度时等待缓冲队列与剩余调度执行完成.
>
> 注:
>
> >1. 超时回调会挂起调度
> >2. 在回调中调用 Close/SetWorker 需要携程运行
#### 1. 创建消费者
> 创建一个缓冲为 n 的消费者通道
```go
var consumer chan interface{} // 消费者
consumer = make(chan interface{}, n)
```
#### 2. 创建生产者
```go
var producer chan interface{} // 生产者
producer = make(chan interface{})
```
#### 3. 生产消费
> 将生产的值发送到消费者
``` go
consumer <- <- producer
```
#### 4. 对消费添加超时处理
> 单纯的将生产的值发送给消费者没有任何意义,所以添加 t 超时控制.
```go
for v = range producer {
// 退出生产者调度
if v == nil {
break
}
// 处理生产
select {
case consumer <- v: // 转发消费
case <-t.C: // 消费超时
// 超时处理
}
}
```
#### 5. 创建缓冲队列
> 创建缓冲队列
```go
var value []interface{}
```
> 尝试重新发超时值,无法重新发情况下添加到超时列表,并通知 CallTime 回调函数.
```go
select {
case consumer <- v:
default:
value = append(value, v)
go CallTime()
}
```
#### 6. 退出处理
> 在调度队列退出的情况下,处理缓冲队列值
``` go
// 处理剩余
if len(value) > 0 {
for v = range value {
Call(v)
}
value = value[0:0]
}
```
> 退出调度
``` go
ticker.Stop() // 处理超时过程
producer <- nil // 关闭生产者
close(g.consumer) // 关闭消费者
```
## 完整实现
``` go
package main
import (
"fmt"
"sync"
"time"
)
// Group 调度器
type Group struct {
consumer chan interface{} // 消费者
producer chan interface{} // 生产者
value []interface{} // 溢出表
wait sync.WaitGroup // 退出等待
ticker *time.Ticker // 消费超时
// 用户参数
Interval int64 // 超时时间
Worker int // 消费者数量
Call func(v interface{}) // 处理函数
CallTime func() // 超时回调
IsStart bool // 启动标记
}
// Init 初始化
func (g *Group) Init() {
g.producer = make(chan interface{})
}
// SetWorker 设置消费者数量
func (g *Group) SetWorker(Worker int) {
// 直接设置值
g.Worker = Worker
// 如果在启动的情况下再次启动
if g.IsStart {
g.Start()
}
}
// SetInterval 设置超时
func (g *Group) SetInterval(t int64) {
// 设置超时
g.Interval = t
// 是否启动
if g.IsStart {
// 关闭定时器
g.ticker.Stop()
}
}
// Start 启动
func (g *Group) Start() {
// 是否启动
if g.IsStart {
g.Close()
}
g.IsStart = true
// 重新设置超时
g.ticker = time.NewTicker(time.Second * time.Duration(g.Interval))
g.consumer = make(chan interface{}, g.Worker)
// 调度过程
for i := 0; i < g.Worker; i++ {
go func() {
g.wait.Add(1)
defer g.wait.Done()
var v interface{}
for v = range g.consumer {
g.Call(v)
}
}()
}
// 转发过程
go func() {
g.wait.Add(1)
defer g.wait.Done()
// 处理
var v interface{}
var ok bool
for v = range g.producer {
// 退出生产者调度
if v == nil {
break
}
// 处理生产
select {
case g.consumer <- v: // 转发消费
case _, ok = <-g.ticker.C: // 消费超时
// 关闭定时
if !ok {
// 重新启动定时
g.ticker = time.NewTicker(time.Second * time.Duration(g.Interval))
}
select {
case g.consumer <- v: // 重新尝试写
default: // 无法写进行记录
g.value = append(g.value, v)
g.CallTime()
}
}
}
// 处理剩余
if len(g.value) > 0 {
for v = range g.value {
g.Call(v)
}
g.value = g.value[0:0]
}
}()
}
// Close 退出
func (g *Group) Close() {
g.IsStart = false // 是否启动
g.ticker.Stop() // 处理超时过程
g.producer <- nil // 关闭生产者
close(g.consumer) // 关闭消费者
g.wait.Wait() // 等待消费完成
}
// Go 添加一个值
func (g *Group) Go(v interface{}) {
if v != nil {
g.producer <- v
}
}
func main() {
var g *Group
n := 2
g = &Group{
Call: func(v interface{}) {
<-time.NewTicker(time.Second).C // 模拟任务延时
fmt.Println("处理值: ", v)
},
CallTime: func() {
n = n + 2
fmt.Println("重新设置调度:", n)
go g.SetWorker(n) // 调度数量
fmt.Println("重新设置完成")
},
}
g.Init() // 初始化
g.SetWorker(n) // 调度数量
g.SetInterval(1) // 超时一秒
// 开始处理
g.Start()
// 添加值
for i := 0; i < 10; i++ {
g.Go(i)
}
// 等待关闭
g.Close()
}
```
有疑问加站长微信联系(非本文作者))