## 调度实现
> 调度通过滑动窗口得方式处理.
> 1. 将销毁的F放入空闲列表.
> 2. 将要执行的F追加到等待列表.
> 3. 如果执行数量未大于滑窗执行大小.直接执行.
> 4. 如果等待执行为空挂起滑窗执行.
> 5. 如果空闲列表为空且没有将销毁的F情况下追加新的块到空闲列表.
> 6. 如果等待执行的列表过多且系统冗余运行情况下增加新的执行.
> 7. 在系统冗余为空且空闲列表为空,情况下申请扩容(其他实现).
### Group 接口
> 定义接口 GroupPort 并实现方法
>
> 1. Go 方法添加一个执行
>
> 3. GOMAXPROCS 设置并发数量
>
> > 用于改变并发数量
>
> 3. New 初始化
>
> > 初始化用
>
> 4. Wait 等待,如果调度出错将返回
>
> > 如果申请扩容的话也会引起函数返回.
##### 接口代码
```go
// GroupPort 调度接口
type GroupPort interface {
Go(f func() error) // 添加一个处理
GOMAXPROCS (n int) // 设置并发数量
New(n int) error // 初始化
Wait() // 等待
Close() // 关闭
}
```
### Group 实现
```go
// GroupCall 群组回调
type GroupCall interface{}
// GroupCallGOMAXPROCS 调度类型
type GroupCallGOMAXPROCS struct{}
// GroupCallError 调度类型
type GroupCallError struct {
Name string // 函数名
File string // 函数所在文件
Line int // 行数
Error error
}
// Group 调度
type Group struct {
init bool // 初始化标记
call chan GroupCall // 回调通道
calllook bool // 回调标记
// 调度
ch chan func() error // 调度通道
cl [2]*[]func() error // 调度列表
clc chan bool
clcut bool // 当前调度列表
clmutex sync.Mutex // 调度锁
callmutex sync.RWMutex // 调度读写锁
}
func (g *Group) remain() {
// 切换运行态
g.clmutex.Lock()
g.clcut = !g.clcut
g.clmutex.Unlock()
// 得到列表
var f *[]func() error
var fc func() error
if g.clcut {
f = g.cl[1]
} else {
f = g.cl[0]
}
// 调度
g.callmutex.RLock()
for _, fc = range *f {
g.ch <- fc
}
g.callmutex.RUnlock()
*f = (*f)[0:0] // 清空
// 判定缓存区是否有值
if (len(*g.cl[0])+len(*g.cl[1])) > 0 && g.calllook {
select {
case g.call <- GroupCallGOMAXPROCS{}:
default:
}
}
}
func (g *Group) do(f func() error) {
var err error
defer func() {
// 拦截错误
if r := recover(); r != nil {
err = fmt.Errorf("[Group.do.panic] 调度执行失败: %s", r)
}
if err != nil {
// 记录错误
pc, _, _, _ := runtime.Caller(1)
f := runtime.FuncForPC(reflect.ValueOf(main).Pointer())
ff, fl := f.FileLine(pc)
// 发送错误
if g.calllook {
g.call <- GroupCallError{
Name: f.Name(),
File: ff,
Line: fl,
Error: err,
}
} else {
// 错误输出到命令行
fmt.Println(f.Name(), ff, fl, err)
}
}
}()
// 执行函数
err = f()
}
// New 初始化
func (g *Group) New(n int) error {
if n <= 0 {
return fmt.Errorf("%s\n", "[Group.GOMAXPROCS] 并发限制必须大于 0")
}
g.clc = make(chan bool)
// 初始化链表
cla := make([]func() error, 0)
clb := make([]func() error, 0)
g.cl[0] = &cla
g.cl[1] = &clb
// 启动调度
func() {
// 调度携程
go func() {
// 处理调度
for range g.clc {
g.remain() // 调度函数
}
}()
}()
g.GOMAXPROCS(n)
g.init = true
return nil
}
// GOMAXPROCS 设置并发数量
func (g *Group) GOMAXPROCS(n int) {
// 锁
g.callmutex.Lock()
defer g.callmutex.Unlock()
// 是否需要关闭
if g.init {
close(g.ch)
close(g.call)
}
// 设置值
g.ch = make(chan func() error, n)
g.call = make(chan GroupCall, n)
// 构建并发
for i := 0; i < n; i++ {
go func() {
var f func() error
for f = range g.ch {
g.do(f)
// 通知调度
g.clc <- true
}
}()
}
}
// Go 处理过程
func (g *Group) Go(f func() error) {
if g.init {
g.callmutex.RLock()
defer g.callmutex.RUnlock()
select {
case g.ch <- f:
default:
g.clmutex.Lock()
if g.clcut {
*g.cl[0] = append(*g.cl[0], f)
} else {
*g.cl[1] = append(*g.cl[1], f)
}
g.clmutex.Unlock()
}
} else {
// 在没有初始化情况下不进行调度
go g.do(f)
}
}
// Wait 等待
func (g *Group) Wait() chan GroupCall {
if !g.init {
return nil
}
g.calllook = true
return g.call
}
// Close 关闭
func (g *Group) Close() {
if g.init {
g.init = false
g.calllook = false
g.remain()
close(g.clc)
close(g.ch)
close(g.call)
}
}
```
有疑问加站长微信联系(非本文作者))