golang 线程池及一些总结

jianzi123 · 2017-11-24 08:38:01 · 8051 次点击 · 预计阅读时间 1 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2017-11-24 08:38:01 的文章,其中的信息可能已经有所发展或是发生改变。

先说生产者消费者问题,就是多个生产者生产商品,放到一块缓存中,然后多个消费者从缓存取出并消费商品; 这里就有了三个对象或说实体,就是生产者,消费者,固定大小的缓存;
然后就要说下一些限制条件,

  1. 如果缓存满了,生产者就不能生产商品了,只能等着有空间了再生产;
  2. 如果缓存空了,消费者就不能消费商品了,只能等以后商品了再消费;
    之前用c实现的时候,稍微麻烦一下;
    消费者:
    mutex.lock()
    while(container.size == 0){
    pthread_cond_wait(nullcond, mutex) // 具体方法忘了... 就是如果缓存为空,利用条件变量,阻塞在这里,并把锁释放出来,好让生产者往里写啊...这里用while...是因为多个消费者都饿着的情况下,要一个一个来啊...
    }
    if(container.size == MAXSIZE) {
    container->pop// 消费一个商品
    phtread_cond_sign(nullcond)// 同样具体方法不详,给条件变量发个信号,让它把因为缓存满的情况下的阻塞的条件放开,然后锁该等继续等...
    }else{
    container->pop// 消费一个商品
    }
    mutex.unlock()
    生产者:
    跟上面差不多,满的话,加个条件变量等着,放开锁,非满的信号来了,往里写...不能往里写,还得判断是不是空,空的话,给条件变量发个信号,可以先放一放了,然后往里写,unlock锁,结束 这个虽然有点绕,但是把共享资源的使用逻辑表现的很好...
    如果用golang来写, 就没有那么绕了...
    先把代码贴上...有空再写剩下的部分.

接着写... 先来介绍一个一个工具,就是平时练习或者写demo的工具tour,在终端输入go tool tour就可以启动tour服务(图就算了). 然后上代码

var stop = make(chan int)
var queue = make(chan interface{})
func push(src interface{}) {
    select{
    case <-stop:
        return
    default:
    }
    select{
    case <-stop:
        return
    case queue <-src:

    }
}

func pop() interface{}{
    select{
    case <-stop:
        return nil
    default:
    }
    select{
    case <-stop:
        return nil
    case a := <-queue:
        fmt.Println(a)
        return a
    }
}

首先...用go确实挺简单的...然后这里利用前一个select解决一部分select满足条件case随机选择问题...然后...比较坑的问题就来了...如果突然close(stop)肯定有一部分放到queue里,最后没有被处理...
这个例子主要是说下生产者消费者逻辑,以及利用close channel当作广播使用...
先写到这...

package main

import (
    "fmt"
    "reflect"
    "sync"
    "time"
)

type _Thread struct {
    Body  interface{}
    Args  []interface{}
}

var queue = make(chan _Thread, 100)
var stop = make(chan int)
var wg = sync.WaitGroup{}

func app(a int, para, item string)  {
    fmt.Printf("%d; %s %s", a, para, item)

}

func sendMsg(sed _Thread) {
    select {
    case <-stop:
        return
    default:

    }
    select {
    case <-stop:
        return
    case a := <- sed:
        queue <- a
    }
}
func main() {
    go func() {
        wg.Add(100)
        for i := 0; i < 100; i++ {
            go func() {
                defer wg.Done()
                for{
                    select {
                    case <-stop:
                        return
                    default:
                    }
                    select {
                    case v := <-queue:
                        if _, ok := <-stop; ok == false {
                            fmt.Print("stop closed")
                        }
                        obj := reflect.ValueOf(v.Body)
                        if obj.Kind() == reflect.Func {
                            params := make([]reflect.Value, len(v.Args))
                            for i, value := range v.Args {
                                params[i] = reflect.ValueOf(value)

                            }
                            obj.Call(params)
                            fmt.Println("jion ", i)
                        }
                    case <-stop:
                        return
                    }
                }


            }()
        }
        wg.Wait()
    }()

    go func(){
        example := _Thread{
            Body: app,
            Args: []interface{}{123, "123", "456"},
        }
        for {
            sendMsg(example)
        }
    }()
    time.Sleep(2)
    close(stop)
    a := <-stop
    t := make(chan int)
    t <- 1
}

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

8051 次点击  
加入收藏 微博
1 回复  |  直到 2019-01-22 22:31:44
olzhy
olzhy · #1 · 6年之前
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传