[CatServer 服务器框架] Group 调度实现

RuiCat · · 871 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 调度实现 > 调度通过滑动窗口得方式处理. > 1. 将销毁的F放入空闲列表. > 2. 将要执行的F追加到等待列表. > 3. 如果执行数量未大于滑窗执行大小.直接执行. > 4. 如果等待执行为空挂起滑窗执行. > 5. 如果空闲列表为空且没有将销毁的F情况下追加新的块到空闲列表. > 6. 如果等待执行的列表过多且系统冗余运行情况下增加新的执行. > 7. 在系统冗余为空且空闲列表为空,情况下申请扩容(其他实现). ### Group 接口 > 定义接口 GroupPort 并实现方法 > > 1. Go 方法添加一个执行 > > 3. GOMAXPROCS 设置并发数量 > > > 用于改变并发数量 > > 3. New 初始化 > > > 初始化用 > > 4. Wait 等待,如果调度出错将返回 > > > 如果申请扩容的话也会引起函数返回. ##### 接口代码 ```go // GroupPort 调度接口 type GroupPort interface { Go(f func() error) // 添加一个处理 GOMAXPROCS (n int) // 设置并发数量 New(n int) error // 初始化 Wait() // 等待 Close() // 关闭 } ``` ### Group 实现 ```go // GroupCall 群组回调 type GroupCall interface{} // GroupCallGOMAXPROCS 调度类型 type GroupCallGOMAXPROCS struct{} // GroupCallError 调度类型 type GroupCallError struct { Name string // 函数名 File string // 函数所在文件 Line int // 行数 Error error } // Group 调度 type Group struct { init bool // 初始化标记 call chan GroupCall // 回调通道 calllook bool // 回调标记 // 调度 ch chan func() error // 调度通道 cl [2]*[]func() error // 调度列表 clc chan bool clcut bool // 当前调度列表 clmutex sync.Mutex // 调度锁 callmutex sync.RWMutex // 调度读写锁 } func (g *Group) remain() { // 切换运行态 g.clmutex.Lock() g.clcut = !g.clcut g.clmutex.Unlock() // 得到列表 var f *[]func() error var fc func() error if g.clcut { f = g.cl[1] } else { f = g.cl[0] } // 调度 g.callmutex.RLock() for _, fc = range *f { g.ch <- fc } g.callmutex.RUnlock() *f = (*f)[0:0] // 清空 // 判定缓存区是否有值 if (len(*g.cl[0])+len(*g.cl[1])) > 0 && g.calllook { select { case g.call <- GroupCallGOMAXPROCS{}: default: } } } func (g *Group) do(f func() error) { var err error defer func() { // 拦截错误 if r := recover(); r != nil { err = fmt.Errorf("[Group.do.panic] 调度执行失败: %s", r) } if err != nil { // 记录错误 pc, _, _, _ := runtime.Caller(1) f := runtime.FuncForPC(reflect.ValueOf(main).Pointer()) ff, fl := f.FileLine(pc) // 发送错误 if g.calllook { g.call <- GroupCallError{ Name: f.Name(), File: ff, Line: fl, Error: err, } } else { // 错误输出到命令行 fmt.Println(f.Name(), ff, fl, err) } } }() // 执行函数 err = f() } // New 初始化 func (g *Group) New(n int) error { if n <= 0 { return fmt.Errorf("%s\n", "[Group.GOMAXPROCS] 并发限制必须大于 0") } g.clc = make(chan bool) // 初始化链表 cla := make([]func() error, 0) clb := make([]func() error, 0) g.cl[0] = &cla g.cl[1] = &clb // 启动调度 func() { // 调度携程 go func() { // 处理调度 for range g.clc { g.remain() // 调度函数 } }() }() g.GOMAXPROCS(n) g.init = true return nil } // GOMAXPROCS 设置并发数量 func (g *Group) GOMAXPROCS(n int) { // 锁 g.callmutex.Lock() defer g.callmutex.Unlock() // 是否需要关闭 if g.init { close(g.ch) close(g.call) } // 设置值 g.ch = make(chan func() error, n) g.call = make(chan GroupCall, n) // 构建并发 for i := 0; i < n; i++ { go func() { var f func() error for f = range g.ch { g.do(f) // 通知调度 g.clc <- true } }() } } // Go 处理过程 func (g *Group) Go(f func() error) { if g.init { g.callmutex.RLock() defer g.callmutex.RUnlock() select { case g.ch <- f: default: g.clmutex.Lock() if g.clcut { *g.cl[0] = append(*g.cl[0], f) } else { *g.cl[1] = append(*g.cl[1], f) } g.clmutex.Unlock() } } else { // 在没有初始化情况下不进行调度 go g.do(f) } } // Wait 等待 func (g *Group) Wait() chan GroupCall { if !g.init { return nil } g.calllook = true return g.call } // Close 关闭 func (g *Group) Close() { if g.init { g.init = false g.calllook = false g.remain() close(g.clc) close(g.ch) close(g.call) } } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

871 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传