在多线程编程中,我经常会用到2个方面的线程控制,一是启停线程,二是控制线程数量,在golang中,启动的是协程,一个比线程更好的东西。为了实现这两个功能,在github fork grtm 并进行了功能增强
更多例子请看 github.com/fy138/grtm
一、启停线程
package main import ( "fmt" // "runtime" "time" "github.com/fy138/grtm" ) func myfunc(me interface{}) { fmt.Println("hello+" + me.(string)) time.Sleep(time.Second * 2) } func main() { gm := grtm.NewGrManager() //在创建gm后新建一个进程接收出错信息 go func(gm *grtm.GrManager) { for { select { case err := <-gm.ErrChan: fmt.Println("Received error:", err.Error()) case notify := <-gm.NotiChan: fmt.Println("Received Notify:", notify) } } }(gm) gm.NewLoopGoroutine("myfunc", myfunc, "1") gm.NewLoopGoroutine("myfunc2", myfunc, "2") time.Sleep(time.Second * 3) gm.StopLoopGoroutine("aaaaaa") time.Sleep(time.Second * 3) gm.StopLoopGoroutine("myfunc2") time.Sleep(time.Second * 3) gm.NewLoopGoroutine("myfunc", myfunc, "1") time.Sleep(time.Second * 3) for { for k, v := range gm.GetAllTask() { fmt.Printf("task name:%s,task id:%d,task name2:%s\n", k, v.Gid, v.Name) } fmt.Printf("NumTask:%d\n", gm.GetTaskTotal()) time.Sleep(time.Second * 1) } }
输出是这样的
hello+1 hello+2 hello+1 hello+2 Received error: not found goroutine name :aaaaaa hello+1 hello+2 hello+1 Received Notify: gid[1597969999]quit hello+1 Received error: goroutine channel already defined: "myfunc" hello+1 hello+1 task name:myfunc,task id:5577006791947779410,task name2:myfunc NumTask:1 task name:myfunc,task id:5577006791947779410,task name2:myfunc NumTask:1 hello+1 task name:myfunc,task id:5577006791947779410,task name2:myfunc NumTask:1
二、限制线程数量
package main import ( "fmt" "runtime" "time" "github.com/fy138/grtm" ) func main() { go func() { for { //get goroutine total fmt.Println("go goroutines:", runtime.NumGoroutine()) time.Sleep(time.Second * 1) } }() //建立线程池 pool_1 := grtm.NewPool(3) pool_2 := grtm.NewPool(2) for i := 100; i >= 1; i-- { fmt.Println("I=", i) //通过通道来限制goroutine 数量 /* 下面是第一种调用方法 */ pool_1.LimitChan <- true //importan pool_1.AddTask(Download, i, "Download_1") /* 下面是第二种调用方法 */ pool_2.LimitChan <- true //importan go func(i int, str string) { Download2(i, str) //函数执行完释放通道 defer func() { <-pool_2.LimitChan }() }(i, "Download_2") } time.Sleep(time.Second * 20) //防止主线程提前退出 } func Download(args ...interface{}) { time.Sleep(2 * time.Second) fmt.Printf("%s => %d \n", args[0].([]interface{})[1].(string), args[0].([]interface{})[0].(int)) } func Download2(i int, str string) { time.Sleep(2 * time.Second) fmt.Printf("%s => %d \n", str, i) }
输出结果
I= 100 go goroutines: 4 I= 99 I= 98 go goroutines: 9 Download_2 => 100 I= 97 Download_1 => 100 go goroutines: 9 Download_2 => 99 I= 96 Download_1 => 99 Download_1 => 98 go goroutines: 8 Download_2 => 98 I= 95 Download_1 => 97 go goroutines: 8 Download_2 => 97 I= 94 Download_1 => 96 go goroutines: 8 Download_2 => 96 I= 93 Download_1 => 95 go goroutines: 8 Download_2 => 95 I= 92 Download_1 => 94 go goroutines: 8 Download_2 => 94 I= 91