第三章Go语言并发组件
goroutine
func gorountineExample() {
sayHello := func() {
fmt.Println("Hello world!")
}
go sayHello()
}
sync包
func syncExample() {
var wg sync.WaitGroup
sayHello := func() {
defer wg.Done()
fmt.Println("Hello")
}
wg.Add(1)
go sayHello()
wg.Wait()
}
func syncExample2() {
var wg sync.WaitGroup
salutation := "hello"
wg.Add(1)
go func() {
defer wg.Done()
salutation = "welcome"
}()
wg.Wait()
fmt.Println(salutation)
}
func syncExample3() {
var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(salutation)
}()
}
wg.Wait()
}
func syncExample4() {
var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
wg.Add(1)
go func(salutation string) {
defer wg.Done()
fmt.Println(salutation)
}(salutation)
}
wg.Wait()
}
WaitGroup
func waitGroupExample() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("1st gorountine sleeping...")
time.Sleep(1 * time.Second)
}()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("2nd gorountine sleeping...")
time.Sleep(2 * time.Second)
}()
wg.Wait()
fmt.Println("All gorountines complete.")
// waitGroup 可以看作一个并发—安全的计数器
// 调用通过传入的整数执行 add 方法增加计数器的增量 // 并调用 Done 方法对计数器进行增减,Wait 阻塞,直到计数器为零。}
func waitGroupExample2() {
hello := func(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("Hello from %v!n", id)
}
const numGreeters = 5
var wg sync.WaitGroup
wg.Add(numGreeters)
for i := 0; i < numGreeters; i++ {
go hello(&wg, i+1)
}
wg.Wait()
}
互斥锁和读写锁
func mutexExample() {
var count int
var lock sync.Mutex
increment := func() {
lock.Lock()
defer lock.Unlock()
count++
fmt.Printf("Incrementing: %dn", count)
}
decrement := func() {
lock.Lock()
defer lock.Unlock()
count--
fmt.Printf("Decrementing: %dn", count)
}
// 增量
var arithmetic sync.WaitGroup
for i := 0; i <= 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
increment()
}()
}
// 减量
for i := 0; i <= 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
decrement()
}()
}
arithmetic.Wait()
fmt.Println("Arithmetic complete.")
}
func mutexExample2() {
producer := func(wg *sync.WaitGroup, l sync.Locker) {
// 第二个参数是 sync.Locker 类型,
// 这个接口有两个方法 Lock 和 Unlock, // 分别对应 Mutex 和 RWMutex defer wg.Done()
for i := 5; i > 0; i-- {
l.Lock()
l.Unlock()
time.Sleep(time.Second / 100)
}
}
observer := func(wg *sync.WaitGroup, l sync.Locker) {
defer wg.Done()
l.Lock()
defer l.Unlock()
}
test := func(count int, mutex, rwMutex sync.Locker)
time.Duration {
var wg sync.WaitGroup
wg.Add(count + 1)
beginTestTime := time.Now()
go producer(&wg, mutex)
for i := count; i > 0; i-- {
go observer(&wg, rwMutex)
}
wg.Wait()
return time.Since(beginTestTime)
}
var b byte
tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, b, 0)
defer tw.Flush()
var m sync.RWMutex
fmt.Fprintf(tw, "ReaderstRWMutextMutexn")
for i := 0; i < 20; i++ {
count := int(math.Pow(2, float64(i)))
fmt.Fprintf(
tw,
"%dt%vt%vn",
count,
test(count, &m, m.RLocker()),
test(count, &m, &m),
)
}
}
cond
Cond 类型,一个 gorountine 的集合点,等待或发布一个 event
一个 "event" 是两个或两个以上的 gorountine 之间的任意信号
c.Signal() 方法,它提供通知 gorountine 阻塞的调用 Wait,条件已经被触发。
Signal 发现等待最长时间的 gorountine 并通知它。
另一个 boardcast() 方法,是向所有等待的 gorountine 发送信号。它提供了一种
同时与多个 gorountine 通信的方法。
与 channel 相比,Cond 类型的性能要高很多。
func condExample() {
c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 10)
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
c.L.Lock()
queue = queue[1:]
fmt.Println("Removed from queue")
c.L.Unlock()
c.Signal()
}
for i := 0; i < 10; i++ {
c.L.Lock()
for len(queue) == 2 {
c.Wait()
}
fmt.Println("Adding to queue")
queue = append(queue, struct{}{})
go removeFromQueue(time.Second)
c.L.Unlock()
}
}
func condExample2() {
type Button struct {
Clicked *sync.Cond
}
button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
subscribe := func(c *sync.Cond, fn func()) {
// 允许我们注册函数处理来自条件的信号,每个处理程序都在自己的 gorountine 上运行
// 并且订阅不会退出,直到 gorountine 被确认运行为止。 var gorountineRunning
sync.WaitGroup
gorountineRunning.Add(1)
go func() {
gorountineRunning.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
gorountineRunning.Wait()
fmt.Println("subscribe end")
}
var clickRegistered sync.WaitGroup
clickRegistered.Add(3)
subscribe(button.Clicked, func() {
fmt.Println("Maximizing window.")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Displaying annoying dialog box!")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Mouse clicked.")
clickRegistered.Done()
})
time.Sleep(time.Second)
button.Clicked.Broadcast()
// 在 Clicked Cond 调用 Broadcast,所以三个处理程序都将运行
clickRegistered.Wait()
}
once
这是因为 sync.Once 只计算调用Do的次数,而不是多少次唯一调用Do方怯
func onceExample() {
var count int
increment := func() {
count++
}
var once sync.Once
var increments sync.WaitGroup
increments.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer increments.Done()
once.Do(increment)
}()
}
increments.Wait()
fmt.Printf("Count is %dn", count)
}
pool
func BenchmarkNetworkRequest(b *testing.B) {
for i := 0; i < b.N; i++ {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
b.Fatalf("cannot dial host: %v", err)
}
if _, err := ioutil.ReadAll(conn); err != nil {
b.Fatalf("cannot read: %v", err)
}
conn.Close()
}
}
// 用 pool 可以尽可能快地将预先分配的对象缓存加载启动
func connectToService() interface{} {
fmt.Println("connectToService")
time.Sleep(time.Second)
return struct{}{}
}
func startNetworkDaemon() *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()
wg.Done()
for {
fmt.Println("sasa")
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection: %v", err)
continue
}
connectToService()
fmt.Fprintln(conn, "")
conn.Close()
}
}()
return &wg
}
func init() {
daemonStarted := startNetworkDaemon()
daemonStarted.Wait()
//daemonStarted := startNetworkCacheDaemon()
//daemonStarted.Wait()
}
func warmServiceConnCache() *sync.Pool {
p := &sync.Pool{
New: connectToService,
}
for i := 0; i < 10; i++ {
p.Put(p.New())
}
return p
}
func startNetworkCacheDaemon() *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
connPool := warmServiceConnCache()
server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()
wg.Done()
for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection: %v", err)
}
svcConn := connPool.Get()
fmt.Fprintln(conn, "")
connPool.Put(svcConn)
conn.Close()
}
}()
return &wg
}
select
func selectExample4() {
var c <-chan int
select {
case <-c: // c 是 nil,会一直阻塞
case <-time.After(time.Second):
fmt.Println("Time Out")
}
}
func selectExample5() {
start := time.Now()
var c1, c2 <-chan int
select {
case <-c1:
case <-c2:
default:
fmt.Printf("In default after %vnn", time.Since(start))
}
}
func selectExample6() {
done := make(chan interface{})
go func() {
time.Sleep(5 * time.Second)
close(done)
}()
workCounter := 0
loop:
for {
select {
case <-done:
break loop
default:
}
workCounter++
time.Sleep(time.Second)
}
fmt.Printf("Achieved %v cycles of work before signalled to stop.n", workCounter)
}
第四章
约束
for-select
// forSelectExample
// for-select 循环
func forSelectExample() {
for { // 无限循环,使用 range 循环
select {
// 使用 channel 进行作业
}
}
}
// foreSelectExample2
// 向 channel 发送迭代变量
func forSelectExample2() {
var done chan interface{}
var stringStream chan string
// ...
for _, s := range []string{"a", "b", "c"} {
select {
case <-done:
return
case stringStream <- s:
}
}
}
// forSelectExample3
// 循环等待停止,变体1
func forSelectExample3() {
var done chan interface{}
for {
select {
case <-done:
return
default:
}
// 进行非抢占式任务
}
}
// forSelectExample4
// 循环等待停止,变体2
func forSelectExample4() {
var done chan interface{}
for {
select {
case <-done:
return
default:
// 进行非抢占式任务
}
}
}
防止 goroutine 泄漏
有疑问加站长微信联系(非本文作者)