What you are wasting today is tomorrow for those who died yesterday; what you hate now is the future you can not go back.
你所浪费的今天是昨天死去的人奢望的明天; 你所厌恶的现在是未来的你回不去的曾经。
Buffered channel
之前我们说的channel都是不带缓冲的,无论发送和接收都会导致阻塞。
缓冲Channel的特点是:只有当发送至缓冲区存满后导致阻塞, 接受也是如此。
创建方式: ch:= make(chan Type , capacity)
capacity 容量, 当capacity = 0 时, 为无缓冲channel,通常省略而已。
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch) // 注释此行,会不会deadlock???
}
下这个例子请认真思考,有助于理解buffered channel:
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
解释:
当main程创建一个有容量为2的channel,然后在goroutine中循环写入, 在写入两次后, goroutine阻塞, main程同时也进入了sleep中,当range开始接收后,goroutine发现又可以继续写入。
输出:
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
一开始写入两次,是因为channel容量为2, 不需要读取就可写入。
最后连续两次读,是因为当range读取一次后, goroutine立刻写入一次,所以channel中始终保持2个数据。
概念: 长度与容量
容量是指channel最大的存储长度。 长度是指当前channel中正在排队的数据长度。
代码说明:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "数据1"
ch <- "数据2"
//容量为3, 但是其中数据只有2个
fmt.Println("capacity is", cap(ch))
//数据长度为2
fmt.Println("length is", len(ch))
//读取一次
fmt.Println("read value", <-ch)
//数据长度为1, 但是容量还是3
fmt.Println("new length is", len(ch))
}
输出:
capacity is 3
length is 2
read value 数据1
new length is 1
WaitGroup
工作池的实现离不开WaitGroup, 下面讲一下关于WariGroup。
如果一个main程中有三个goroutine, 要想获得这三个goroutine的输出,那么 需要使用WaitGroup阻塞main程,等待所有goroutine结束。
package main
import (
"fmt"
"sync"
"time"
)
func ProcessEcho( i int , w *sync.WaitGroup){
fmt.Println("协程", i , "开始")
time.Sleep(1*time.Second)
fmt.Println("协程", i , "结束")
w.Done()
}
func main(){
var w sync.WaitGroup
Max := 10
for i:= 0; i<Max ;i++ {
w.Add(1)
go ProcessEcho(i, &w)
}
w.Wait()
fmt.Println("main执行完成并退出。")
}
解释:
main程启动10个协程, 每天启动都高速WaitGroup来添加一个监听,每个goroutine结束都标记一次结束。 main程中等待所有标记完成,结束阻塞。
注意点:
1. 为什么go ProcessEcho中使用的是w的指针?!
2. goroutine的输出是没有规律的。
细看工作池的实现吧:
package main
import (
"fmt"
"sync"
"time"
)
//任务结构
type Job struct {
id int
randomno int
}
//接受数据结构
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
time.Sleep(2 * time.Second)
return number
}
func worker(i int , wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(i)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(i,&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := i
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 12 // 任务数
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 3 // 执行者
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
有疑问加站长微信联系(非本文作者)