Go语言并发之道

xxx小M · · 662 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

第三章Go语言并发组件

goroutine

func gorountineExample() {
   sayHello := func() {
      fmt.Println("Hello world!")
   }
   go sayHello()
}

sync包

func syncExample() {
   var wg sync.WaitGroup
 sayHello := func() {
      defer wg.Done()
      fmt.Println("Hello")
   }
   wg.Add(1)
   go sayHello()
   wg.Wait()
}
func syncExample2() {
   var wg sync.WaitGroup
 salutation := "hello"
 wg.Add(1)
   go func() {
      defer wg.Done()
      salutation = "welcome"
 }()
   wg.Wait()
   fmt.Println(salutation)
}
func syncExample3() {
   var wg sync.WaitGroup
 for _, salutation := range []string{"hello", "greetings", "good day"} {
      wg.Add(1)
      go func() {
         defer wg.Done()
         fmt.Println(salutation)
      }()
   }
   wg.Wait()
}
func syncExample4() {
   var wg sync.WaitGroup
 for _, salutation := range []string{"hello", "greetings", "good day"} {
      wg.Add(1)
      go func(salutation string) {
         defer wg.Done()
         fmt.Println(salutation)
      }(salutation)
   }
   wg.Wait()
}

WaitGroup

func waitGroupExample() {
   var wg sync.WaitGroup
   wg.Add(1)
   go func() {
      defer wg.Done()
      fmt.Println("1st gorountine sleeping...")
      time.Sleep(1 * time.Second)
   }()
   wg.Add(1)
   go func() {
      defer wg.Done()
      fmt.Println("2nd gorountine sleeping...")
      time.Sleep(2 * time.Second)
   }()
   wg.Wait()
   fmt.Println("All gorountines complete.")
   // waitGroup 可以看作一个并发—安全的计数器
   // 调用通过传入的整数执行 add 方法增加计数器的增量 // 并调用 Done 方法对计数器进行增减,Wait 阻塞,直到计数器为零。}
func waitGroupExample2() {
   hello := func(wg *sync.WaitGroup, id int) {
      defer wg.Done()
      fmt.Printf("Hello from %v!n", id)
   }
   const numGreeters = 5
   var wg sync.WaitGroup
   wg.Add(numGreeters)
   for i := 0; i < numGreeters; i++ {
      go hello(&wg, i+1)
   }
   wg.Wait()
}

互斥锁和读写锁

func mutexExample() {
   var count int
   var lock sync.Mutex
   increment := func() {
      lock.Lock()
      defer lock.Unlock()
      count++
      fmt.Printf("Incrementing: %dn", count)
   }
   decrement := func() {
      lock.Lock()
      defer lock.Unlock()
      count--
      fmt.Printf("Decrementing: %dn", count)
   }
   // 增量
   var arithmetic sync.WaitGroup
   for i := 0; i <= 5; i++ {
      arithmetic.Add(1)
      go func() {
         defer arithmetic.Done()
         increment()
      }()
   }
   // 减量
   for i := 0; i <= 5; i++ {
      arithmetic.Add(1)
      go func() {
         defer arithmetic.Done()
         decrement()
      }()
   }
   arithmetic.Wait()
   fmt.Println("Arithmetic complete.")
}
func mutexExample2() {
   producer := func(wg *sync.WaitGroup, l sync.Locker) {
   // 第二个参数是 sync.Locker 类型,
   // 这个接口有两个方法 Lock 和 Unlock, // 分别对应 Mutex 和      RWMutex defer wg.Done()
      for i := 5; i > 0; i-- {
         l.Lock()
         l.Unlock()
         time.Sleep(time.Second / 100)
      }
   }
   observer := func(wg *sync.WaitGroup, l sync.Locker) {
      defer wg.Done()
      l.Lock()
      defer l.Unlock()
   }
   test := func(count int, mutex, rwMutex sync.Locker)  
   time.Duration {
   var wg sync.WaitGroup
   wg.Add(count + 1)
   beginTestTime := time.Now()
   go producer(&wg, mutex)
      for i := count; i > 0; i-- {
         go observer(&wg, rwMutex)
      }
      wg.Wait()
      return time.Since(beginTestTime)
   }
   var b byte
   tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, b, 0)
   defer tw.Flush()
   var m sync.RWMutex
   fmt.Fprintf(tw, "ReaderstRWMutextMutexn")
   for i := 0; i < 20; i++ {
     count := int(math.Pow(2, float64(i)))
     fmt.Fprintf(
         tw,
         "%dt%vt%vn",
         count,
         test(count, &m, m.RLocker()),
         test(count, &m, &m),
     )
   }
}

cond

Cond 类型,一个 gorountine 的集合点,等待或发布一个 event
一个 "event" 是两个或两个以上的 gorountine 之间的任意信号
c.Signal() 方法,它提供通知 gorountine 阻塞的调用 Wait,条件已经被触发。
Signal 发现等待最长时间的 gorountine 并通知它。
另一个 boardcast() 方法,是向所有等待的 gorountine 发送信号。它提供了一种
同时与多个 gorountine 通信的方法。
与 channel 相比,Cond 类型的性能要高很多。

func condExample() {
   c := sync.NewCond(&sync.Mutex{})
   queue := make([]interface{}, 0, 10)
   removeFromQueue := func(delay time.Duration) {
      time.Sleep(delay)
      c.L.Lock()
      queue = queue[1:]
      fmt.Println("Removed from queue")
      c.L.Unlock()
      c.Signal()
   }
   for i := 0; i < 10; i++ {
      c.L.Lock()
      for len(queue) == 2 {
         c.Wait()
      }
      fmt.Println("Adding to queue")
      queue = append(queue, struct{}{})
      go removeFromQueue(time.Second)
      c.L.Unlock()
   }
}
func condExample2() {
   type Button struct {
      Clicked *sync.Cond
   }
   button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
   subscribe := func(c *sync.Cond, fn func()) {
   // 允许我们注册函数处理来自条件的信号,每个处理程序都在自己的 gorountine 上运行
   // 并且订阅不会退出,直到 gorountine 被确认运行为止。 var gorountineRunning
   sync.WaitGroup
   gorountineRunning.Add(1)
   go func() {
         gorountineRunning.Done()
         c.L.Lock()
         defer c.L.Unlock()
         c.Wait()
         fn()
      }()
      gorountineRunning.Wait()
      fmt.Println("subscribe end")
   }
   var clickRegistered sync.WaitGroup
   clickRegistered.Add(3)
   subscribe(button.Clicked, func() {
      fmt.Println("Maximizing window.")
      clickRegistered.Done()
   })
   subscribe(button.Clicked, func() {
      fmt.Println("Displaying annoying dialog box!")
      clickRegistered.Done()
   })
   subscribe(button.Clicked, func() {
      fmt.Println("Mouse clicked.")
      clickRegistered.Done()
   })
   time.Sleep(time.Second)
   button.Clicked.Broadcast()
   // 在 Clicked Cond 调用 Broadcast,所以三个处理程序都将运行
   clickRegistered.Wait()
}

once

这是因为 sync.Once 只计算调用Do的次数,而不是多少次唯一调用Do方怯

func onceExample() {
   var count int
   increment := func() {
      count++
   }
   var once sync.Once
   var increments sync.WaitGroup
   increments.Add(100)
   for i := 0; i < 100; i++ {
      go func() {
         defer increments.Done()
         once.Do(increment)
      }()
   }
   increments.Wait()
   fmt.Printf("Count is %dn", count)
}

pool

func BenchmarkNetworkRequest(b *testing.B) {
   for i := 0; i < b.N; i++ {
      conn, err := net.Dial("tcp", "localhost:8080")
      if err != nil {
         b.Fatalf("cannot dial host: %v", err)
      }
      if _, err := ioutil.ReadAll(conn); err != nil {
         b.Fatalf("cannot read: %v", err)
      }
      conn.Close()
   }
}

// 用 pool 可以尽可能快地将预先分配的对象缓存加载启动
func connectToService() interface{} {
   fmt.Println("connectToService")
   time.Sleep(time.Second)
   return struct{}{}
}

func startNetworkDaemon() *sync.WaitGroup {
   var wg sync.WaitGroup
   wg.Add(1)
   go func() {
      server, err := net.Listen("tcp", "localhost:8080")
      if err != nil {
         log.Fatalf("cannot listen: %v", err)
      }
      defer server.Close()
      wg.Done()
      for {
         fmt.Println("sasa")
         conn, err := server.Accept()
         if err != nil {
            log.Printf("cannot accept connection: %v", err)
            continue
         }
         connectToService()
         fmt.Fprintln(conn, "")
         conn.Close()
      }
   }()
   return &wg
}

func init() {
   daemonStarted := startNetworkDaemon()
   daemonStarted.Wait()
   //daemonStarted := startNetworkCacheDaemon()
   //daemonStarted.Wait()
}

func warmServiceConnCache() *sync.Pool {
   p := &sync.Pool{
      New: connectToService,
   }
   for i := 0; i < 10; i++ {
      p.Put(p.New())
   }
   return p
}

func startNetworkCacheDaemon() *sync.WaitGroup {
   var wg sync.WaitGroup
   wg.Add(1)
   go func() {
      connPool := warmServiceConnCache()
      server, err := net.Listen("tcp", "localhost:8080")
      if err != nil {
         log.Fatalf("cannot listen: %v", err)
      }
      defer server.Close()
      wg.Done()
      for {
         conn, err := server.Accept()
         if err != nil {
            log.Printf("cannot accept connection: %v", err)
         }
         svcConn := connPool.Get()
         fmt.Fprintln(conn, "")
         connPool.Put(svcConn)
         conn.Close()
      }
   }()
   return &wg
}

select

func selectExample4() {
   var c <-chan int
 select {
   case <-c: // c 是 nil,会一直阻塞
   case <-time.After(time.Second):
      fmt.Println("Time Out")
   }
}
func selectExample5() {
   start := time.Now()
   var c1, c2 <-chan int
   select {
     case <-c1:
     case <-c2:
     default:
       fmt.Printf("In default after %vnn", time.Since(start))
   }
}

func selectExample6() {
   done := make(chan interface{})
   go func() {
      time.Sleep(5 * time.Second)
      close(done)
   }()
   workCounter := 0
loop:
   for {
      select {
      case <-done:
         break loop
 default:
      }
      workCounter++
      time.Sleep(time.Second)
   }
   fmt.Printf("Achieved %v cycles of work before signalled to stop.n", workCounter)
}

第四章

约束

for-select

// forSelectExample
// for-select 循环
func forSelectExample() {
   for { // 无限循环,使用 range 循环
 select {
      // 使用 channel 进行作业
 }
   }
}
// foreSelectExample2
// 向 channel 发送迭代变量
func forSelectExample2() {
   var done chan interface{}
   var stringStream chan string
   // ...
   for _, s := range []string{"a", "b", "c"} {
      select {
      case <-done:
         return
      case stringStream <- s:
      }
   }
}
// forSelectExample3
// 循环等待停止,变体1
func forSelectExample3() {
   var done chan interface{}
   for {
      select {
      case <-done:
         return
      default:
      }
      // 进行非抢占式任务
   }
}
// forSelectExample4
// 循环等待停止,变体2
func forSelectExample4() {
   var done chan interface{}
   for {
      select {
      case <-done:
         return
      default:
         // 进行非抢占式任务
      }
   }
}

防止 goroutine 泄漏


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:xxx小M

查看原文:Go语言并发之道

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

662 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传