最近项目全部转golang
,内心既激动又害怕,因为其实自己code经验不足,更何况用一门不熟悉的语言,写代码的时候老是会去Google语法,而打断设计的思路。归其原因还是对golang的系统练习太少,所以我觉得有必要过一遍Golang In Action,多练习掌握最常用的特性,然后再深度研究Kubernetes的源码(非常漂亮),用到项目中。
所以这一系列文章就是,用Golang In Action里面的例子练习Golang,顺序就是看一章,关掉书,自己敲一遍代码,再对例子代码简单升级和总结。菜鸡共勉...
1.chan例子
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func printer(ch <-chan int) {
for i := range ch {
fmt.Println("Input: ", i)
}
wg.Done()
}
func main() {
c := make(chan int)
go printer(c)
wg.Add(1)
for i := 0; i < 10; i++ {
c <- i
}
close(c)
wg.Wait()
}
不得不说Golang In Action第一个例子就这么凶猛,不仅上了chan
,还使用了sync
。其实从这个例子里面我们可以学到下面几点:
-
range chan
是一个阻塞(Blocked)的操作,如果不是阻塞的,此处goroutine应该马上就退出了 -
range chan
在chan
被关闭之后,会将chan
中的数据全部读取,再退出循环(当然也有人怀疑是不是c<-i
这个操作是一个阻塞的,导致的呢?我们后面可以再做一个实验证明这点) - sync.WaitGroup是可以用来确保任务完成的利器,嗯,这个我们也可以改装一把代码让他变成一个worker pool刷点经验。
2.带buffer的chan
如果有阅读过其他关于chan
的文章,其实知道c<-i
这个操作是一个blocked的。那么为了证明上面的第二点,这里我们:
- 直接换成带10个单位buffer的
chan
- 在goroutine里面sleep 1 second
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func printer(ch <-chan int) {
for i := range ch {
time.Sleep(time.Second)
fmt.Println("Input: ", i)
}
wg.Done()
}
func main() {
c := make(chan int, 10)
go printer(c)
wg.Add(1)
for i := 0; i < 10; i++ {
c <- i
}
close(c)
wg.Wait()
}
结果和之前一样。
3.简单的worker pool
恩,之前谈到sync.WaitGroup
可以用来确保任务的完成,这里我觉得书里的例子用的不是很好,并没有体会到sync.WaitGroup
的价值,所以这里我们再改造一下,写个简单的worker pool。
package main
import (
"fmt"
"sync"
)
type Task int
type Pool interface {
New(int)
Run(<-chan Task)
WaitClose()
}
type WorkPool struct {
workers int
wg sync.WaitGroup
}
func (p *WorkPool) New(workerCnt int) {
var wg sync.WaitGroup
p.wg = wg
p.workers = workerCnt
}
func (p *WorkPool) Run(taskChan <-chan Task) {
for id := 0; id < p.workers; id++ {
p.wg.Add(1)
go worker(taskChan, id, &(p.wg))
}
}
func worker(taskChan <-chan Task, id int, wg *sync.WaitGroup) {
for task := range taskChan {
fmt.Printf("Task: %v, Worker ID@%v \n", task, id)
}
wg.Done()
}
func (p *WorkPool) WaitClose() {
p.wg.Wait()
}
func main() {
// 1. Init a taskChan with 10 buffersize
taskChan := make(chan Task)
// 2. Init a 3 worker pool
var pool WorkPool
pool.New(3)
// 3. Listen to the taskChan and start to Run
pool.Run(taskChan)
// 4. Start send data from taskChan
for i := 0; i < 10; i++ {
taskChan <- Task(i)
}
close(taskChan)
// 5. Wait for worker finish job and close
pool.WaitClose()
}
这里我实现了一个简单workpool,用一个chan TaskChan
来传递Task,WaitGroup来控制和等待所有的Task完成。(代码可能长得不太好看)运行结果如下:
➜ channels git:(master) ✗ go build
➜ channels git:(master) ✗ ./channels
Task: 1, Worker ID@0
Task: 3, Worker ID@0
Task: 4, Worker ID@0
Task: 0, Worker ID@2
Task: 2, Worker ID@1
Task: 6, Worker ID@2
Task: 8, Worker ID@2
Task: 9, Worker ID@2
Task: 7, Worker ID@1
Task: 5, Worker ID@0
这里发现Goroutine的负载大致均衡,所以我想如果我们把chan改成一个带buffer的,是不是第一个goroutine就会一直work,直到buffer里面数据被消耗完。
func main() {
// 1. Init a taskChan
taskChan := make(chan Task, 1000)
// 2. Init a 3 worker pool
var pool WorkPool
pool.New(3)
// 3. Listen to the taskChan and start to Run
pool.Run(taskChan)
// 4. Start send data from taskChan
for i := 0; i < 1000; i++ {
taskChan <- Task(i)
}
close(taskChan)
// 5. Wait for worker finish job and close
pool.WaitClose()
}
这里我用了一个buffer 1000的chan,并且传了1000个数据,运行结果:
Task: 1, Worker ID@2
Task: 2, Worker ID@1
Task: 4, Worker ID@1
Task: 0, Worker ID@0
Task: 6, Worker ID@0
Task: 7, Worker ID@0
Task: 3, Worker ID@2
Task: 9, Worker ID@2
...Always Worker ID@2
Task: 996, Worker ID@2
Task: 997, Worker ID@2
Task: 998, Worker ID@2
Task: 999, Worker ID@2
Task: 5, Worker ID@1
Task: 822, Worker ID@0
所以实验发现,正常情况下这个goroutine会消耗大部分数据,说明Golang的调度器会尽量让一个Goroutine工作完,但是为了保证公平性,还是会给他一个时间片,不会让它一直独占. 暂时猜测是这样,需要看源码验证一下。另外如果,我们在Goroutine里面加个time.Sleep
,或者执行一个Blocked操作,Goroutine也会释放时间片,达到负载均衡。
小结
这一章简单的学到:
-
chan
是一个blocked的操作 - 可以加buffer让
chan
变成Non-blocked - sync.WaitGroup可以用来写workpool来确保任务的完成
- Goroutine通常会尽量独占调度,但是为了公平性会设定时间片(此处为猜测,需要后期验证)
另外写的时候还是有一些坑,比如不熟悉interface
的写法,另外我发现go build
会生成package名字一样的可执行文件。看书总是会觉得很简单,都懂了,关书写一个程序调通,并且修改加强才有真实的理解。
有疑问加站长微信联系(非本文作者)