[CatServer 服务器框架] Group 调度实现

RuiCat · 2019-12-29 01:38:48 · 1039 次点击 · 预计阅读时间 4 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2019-12-29 01:38:48 的文章,其中的信息可能已经有所发展或是发生改变。

调度实现

调度通过滑动窗口得方式处理.

  1. 将销毁的F放入空闲列表.
  2. 将要执行的F追加到等待列表.
  3. 如果执行数量未大于滑窗执行大小.直接执行.
  4. 如果等待执行为空挂起滑窗执行.
  5. 如果空闲列表为空且没有将销毁的F情况下追加新的块到空闲列表.
  6. 如果等待执行的列表过多且系统冗余运行情况下增加新的执行.
  7. 在系统冗余为空且空闲列表为空,情况下申请扩容(其他实现).

Group 接口

定义接口 GroupPort 并实现方法

  1. Go 方法添加一个执行

  2. GOMAXPROCS 设置并发数量

    用于改变并发数量

  3. New 初始化

    初始化用

  4. Wait 等待,如果调度出错将返回

    如果申请扩容的话也会引起函数返回.

接口代码
// GroupPort 调度接口
type GroupPort interface {
    Go(f func() error) // 添加一个处理
    GOMAXPROCS (n int) // 设置并发数量
    New(n int) error   // 初始化
    Wait()             // 等待
    Close()            // 关闭
}

Group 实现

// GroupCall 群组回调
type GroupCall interface{}

// GroupCallGOMAXPROCS 调度类型
type GroupCallGOMAXPROCS struct{}

// GroupCallError 调度类型
type GroupCallError struct {
    Name  string // 函数名
    File  string // 函数所在文件
    Line  int    // 行数
    Error error
}

// Group 调度
type Group struct {
    init     bool           // 初始化标记
    call     chan GroupCall // 回调通道
    calllook bool           // 回调标记
    // 调度
    ch        chan func() error  // 调度通道
    cl        [2]*[]func() error // 调度列表
    clc       chan bool
    clcut     bool         // 当前调度列表
    clmutex   sync.Mutex   // 调度锁
    callmutex sync.RWMutex // 调度读写锁
}

func (g *Group) remain() {
    // 切换运行态
    g.clmutex.Lock()
    g.clcut = !g.clcut
    g.clmutex.Unlock()
    // 得到列表
    var f *[]func() error
    var fc func() error
    if g.clcut {
        f = g.cl[1]
    } else {
        f = g.cl[0]
    }
    // 调度
    g.callmutex.RLock()
    for _, fc = range *f {
        g.ch <- fc
    }
    g.callmutex.RUnlock()
    *f = (*f)[0:0] // 清空
    // 判定缓存区是否有值
    if (len(*g.cl[0])+len(*g.cl[1])) > 0 && g.calllook {
        select {
        case g.call <- GroupCallGOMAXPROCS{}:
        default:
        }
    }
}

func (g *Group) do(f func() error) {
    var err error
    defer func() {
        // 拦截错误
        if r := recover(); r != nil {
            err = fmt.Errorf("[Group.do.panic] 调度执行失败: %s", r)
        }
        if err != nil {
            // 记录错误
            pc, _, _, _ := runtime.Caller(1)
            f := runtime.FuncForPC(reflect.ValueOf(main).Pointer())
            ff, fl := f.FileLine(pc)
            // 发送错误
            if g.calllook {
                g.call <- GroupCallError{
                    Name:  f.Name(),
                    File:  ff,
                    Line:  fl,
                    Error: err,
                }
            } else {
                // 错误输出到命令行
                fmt.Println(f.Name(), ff, fl, err)
            }
        }
    }()
    // 执行函数
    err = f()
}

// New 初始化
func (g *Group) New(n int) error {
    if n <= 0 {
        return fmt.Errorf("%s\n", "[Group.GOMAXPROCS] 并发限制必须大于 0")
    }
    g.clc = make(chan bool)
    // 初始化链表
    cla := make([]func() error, 0)
    clb := make([]func() error, 0)
    g.cl[0] = &cla
    g.cl[1] = &clb
    // 启动调度
    func() {
        // 调度携程
        go func() {
            // 处理调度
            for range g.clc {
                g.remain() // 调度函数
            }
        }()
    }()
    g.GOMAXPROCS(n)
    g.init = true
    return nil
}

// GOMAXPROCS 设置并发数量
func (g *Group) GOMAXPROCS(n int) {
    // 锁
    g.callmutex.Lock()
    defer g.callmutex.Unlock()
    // 是否需要关闭
    if g.init {
        close(g.ch)
        close(g.call)
    }
    // 设置值
    g.ch = make(chan func() error, n)
    g.call = make(chan GroupCall, n)
    // 构建并发
    for i := 0; i < n; i++ {
        go func() {
            var f func() error
            for f = range g.ch {
                g.do(f)
                // 通知调度
                g.clc <- true
            }
        }()
    }
}

// Go 处理过程
func (g *Group) Go(f func() error) {
    if g.init {
        g.callmutex.RLock()
        defer g.callmutex.RUnlock()
        select {
        case g.ch <- f:
        default:
            g.clmutex.Lock()
            if g.clcut {
                *g.cl[0] = append(*g.cl[0], f)
            } else {
                *g.cl[1] = append(*g.cl[1], f)
            }
            g.clmutex.Unlock()
        }
    } else {
        // 在没有初始化情况下不进行调度
        go g.do(f)
    }
}

// Wait 等待
func (g *Group) Wait() chan GroupCall {
    if !g.init {
        return nil
    }
    g.calllook = true
    return g.call
}

// Close 关闭
func (g *Group) Close() {
    if g.init {
        g.init = false
        g.calllook = false
        g.remain()
        close(g.clc)
        close(g.ch)
        close(g.call)
    }
}

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1039 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传