Go并发处理

mb6018e67ba1c26 · · 656 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池。

代码从网上理解了之后写的。代码实例

简单的介绍:

  首先实现一个Job接口,只要有方法实现了Do方法即可

  定义个分发器结构体,主要是WorkPool线程池,用于存储Worker的JobChannel

  init的时候,先初始化一个JobQueue队列,其他的函数调用这个线程池的时候,把任务放在这个队列即可。

  然后Run的时候,创建多个Worker,起初的时候,woker会把自身的JobChannel先注册到线程池workerPool中,

  然后worker.start就是for{select } 阻塞等待JobChannel中的job任务。

  此时又启一个go d.Dispatcher() ,将JobQueue中的job任务放在worker的Jobchannle中。这样上面的for{select} 就可以拿到任务去执行。

  

注: maxWorkers 是内核CPU数量,本机4核,就是线程池可以放4个JobChannel,所以,在newWorker的时候,就创建了4个Worker来并发的处理job任务。

 

任务处理

package workPool

import "fmt"

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	Quit chan bool
}

func NewWorker(workpool chan chan Job) *Worker {
	return &Worker{WorkerPool: workpool,JobChannel: make(chan Job),Quit: make(chan bool)}
}

func (w *Worker) Start()  {
	go func() {
		for{
			w.WorkerPool <-w.JobChannel
			select {
			case job := <-w.JobChannel:
				if err := job.Do();err !=nil{
					fmt.Println("exec some failed ....")
				}
			case <-w.Quit:
				return
			}
		}
	}()
}

func (w *Worker) Stop()  {
	go func() {
		w.Quit <-true
	}()
}

  

 

实现一个分发器

package workPool

import "runtime"

var(
	MaxWorkers = runtime.NumCPU()
	MaxQueue = 512
)

type Job interface {
	Do() error
}

var JobQueue chan Job

type Dispatcher struct {
	MaxWorkers int
	WorkerPool chan chan Job
	Quit chan bool
}

func init()  {
	runtime.GOMAXPROCS(MaxWorkers)
	JobQueue  = make(chan Job,MaxQueue)
	dispatcher := NewDispatcher(MaxWorkers)
	dispatcher.Run()
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job,maxWorkers)
	return &Dispatcher{MaxWorkers: maxWorkers,WorkerPool: pool,Quit: make(chan bool)}
}

func (d *Dispatcher) Run()  {
	for i:=0;i<d.MaxWorkers;i++{
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.Dispatcher()
}

func (d *Dispatcher) Dispatcher() {
		for  {
			select {
			case job := <-JobQueue:
				jobChannel := <-d.WorkerPool
				jobChannel <- job
			case <-d.Quit:
				return
			}
		}
}

  

main函数中可以这样使用
package main

import (
	"context_http/workPool"
	"fmt"
	"net/http"
)

type Msg struct {
	mobile string
}

func (m *Msg) Do() error {
	m.mobile = m.mobile+"_test"
	fmt.Println(m.mobile)

	return nil
}

func getMobile(w http.ResponseWriter,r *http.Request)  {
	defer r.Body.Close()

	r.ParseForm()
	mobile := r.PostForm.Get("mobile")

	var work workPool.Job
	m := Msg{mobile: mobile}
	work = &m
	workPool.JobQueue <- work
	status := `{"status":"ok"}`
	w.Write([]byte(status))
}


func main() {
	http.HandleFunc("/test",getMobile)
	err := http.ListenAndServe(":8081",nil)
	if err !=nil{
		fmt.Println("server failure :",err)
		return
	}
}

  


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mb6018e67ba1c26

查看原文:Go并发处理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

656 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传