架构学习之路(二)-- 高并发实践

魔改谢馒头 · · 920 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

(转自:https://blog.csdn.net/Jeanphorn/article/details/79018205

方案演进

1.直接使用goroutine

在Go语言原生并发的支持下,我们可以直接使用一个goroutine(如下方式)去并行处理这个请求。但是,这种方法明显有些不好的地方,我们没法控制goroutine产生数量,如果处理程序稍微耗时,在单机万级十万级qps请求下,goroutine大规模爆发,内存暴涨,处理效率会很快下降甚至引发程序崩溃。

go handle(request)
goroutine协同带缓存的管道

我们定义一个带缓存的管道

var queue = make(chan job, MAX_QUEUE_SIZE)

然后起一个协程处理管道传来的请求

go func(){
   for {
    select {
        case job := <-queue:
            job.Do(request)
        case <- quit:
            return
    }

   }
}()

接收请求,发送job进行处理

job := &Job{request}

queue <- job

讲真,这种方法使用了缓冲队列一定程度上了提高了并发,但也是治标不治本,大规模并发只是推迟了问题的发生时间。当请求速度远大于队列的处理速度时,缓冲区很快被打满,后面的请求一样被堵塞了。

2.job队列+工作池

只用缓冲队列不能解决根本问题,这时候我们可以参考一下线程池的概念,定一个工作池(协程池),来限定最大goroutine数目。每次来新的job时,从工作池里取出一个可用的worker来执行job。这样一来即保障了goroutine的可控性,也尽可能大的提高了并发处理能力。


job队列+工作池.png
工作池实现

首先,我们定义一个job的接口, 具体内容由具体job实现

type Job interface {
    Do() error
}

然后定义一下job队列和work池类型,这里我们work池也用golang的channel实现。

// define job channel
type JobChan chan Job

// define worker channer
type WorkerChan chan JobChan

我们分别维护一个全局的job队列和工作池。

var (
    JobQueue          JobChan
    WorkerPool        WorkerChan
)

worker的实现。每一个worker都有一个job channel,在启动worker的时候会被注册到work pool中。启动后通过自身的job channel取到job并执行job。

type Worker struct {
    JobChannel JobChan
    quit       chan bool
}

func (w *Worker) Start() {
    go func() {
        for {
            // regist current job channel to worker pool
            WorkerPool <- w.JobChannel
            select {
            case job := <-w.JobChannel:
                if err := job.Do(); err != nil {
                    fmt.printf("excute job failed with err: %v", err)
                }
            // recieve quit event, stop worker
            case <-w.quit:
                return
            }
        }
    }()
}

实现一个分发器(Dispatcher)。分发器包含一个worker的指针数组,启动时实例化并启动最大数目的worker,然后从job队列中不断取job选择可用的worker来执行job。

type Dispatcher struct {
    Workers []*Worker
    quit    chan bool
}

func (d *Dispatcher) Run() {
    for i := 0; i < MaxWorkerPoolSize; i++ {
        worker := NewWorker()
        d.Workers = append(d.Workers, worker)
        worker.Start()
    }

    for {
        select {
        case job := <-JobQueue:
            go func(job Job) {
                jobChan := <-WorkerPool
                jobChan <- job
            }(job)
        // stop dispatcher
        case <-d.quit:
            return
        }
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:魔改谢馒头

查看原文:架构学习之路(二)-- 高并发实践

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

920 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传