书籍:The Way To Go,第四部分

月光独奏 · · 2229 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Channels

var ch1 chan string
ch1 = make(chan string)
ch1 := make(chan string)
buf := 100
ch1 := make(chan string, buf)
chanOfChans := make(chan chan int)
funcChan := chan func()
func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(1e9)
}
func sendData(ch chan string) {
    ch <- “Washington”
    ch <- “Tripoli”
    ch <- “London”
}
func getData(ch chan string) {
    var input string
    for { 
        input = <-ch; 
        fmt.Printf("%s ", input) 
    }
}
  • Semaphore pattern

type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)      // semaphore 
...
for i, xi := range data {
     go func (i int, xi float64) {
           res[i] = doSomething(i,xi)
           sem <- empty
     } (i, xi)
}
for i := 0; i < N; i++ {      // wait for goroutines to finish
     <-sem 
}
  • Channel Factory pattern

func main() {
    stream := pump()
    go suck(stream)        // shortened : go suck( pump() )
    time.Sleep(1e9)
}
func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i
        }
    }()
    return ch
}
func suck(ch chan int) {
    for {
        fmt.Println(<-ch)
    }
}
func suck(ch chan int) {
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()
}
  • Channel Directionality

// channel can only receive data and cannot be closed
var send_only chan<- int
var recv_only <-chan int        // channel can only send data
...
var c = make(chan int)          // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int) {
    for { ch <- 1 }
}
func sink(ch <-chan int) {
    for { <-ch }
}
...
// closing a channel
func sendData(ch chan string) {
     ch <- "Washington"
     ch <- "Tripoli"
     ch <- "London"
     ch <- "Beijing"
     ch <- "Tokio"
     close(ch)
}
func getData(ch chan string) {
     for {
          input, open := <-ch
          if !open {
               break
          }
          fmt.Printf("%s ", input)
     }
}
  • Switching between goroutines with select

select {
case u:= <- ch1:
     ...
case v:= <- ch2:
 ...
default: // no value ready to be received
  ...
}
  1. if all are blocked, it waits until one can proceed 

  2. if multiple can proceed, it chooses one at random.

  3. when none of the channel operations can proceed and the default clause is present, then this is executed: the default is always runnable (that is: ready to execute). Using a send operation in a select statement with a default case guarantees that the send will be non-blocking!

  • channels with timeouts and tickers

// func Tick(d Duration) <-chan Time
import "time"
rate_per_sec := 10
var dur Duration = 1e8          // rate_per_sec
chRate := time.Tick(dur)        // every 1/10th of a second
for req := range requests {
    <- chRate                   // rate limit our Service.Method RPC calls
    go client.Call("Service.Method", req, ...)
}
// func After(d Duration) <-chan Time
func main() {
     tick := time.Tick(1e8)
     boom := time.After(5e8)
    for {
        select {
            case <-tick:
                fmt.Println(“tick.”)
            case <-boom:
                fmt.Println(“BOOM!”)
                return
            default:
                fmt.Println(“    .”)
                time.Sleep(5e7)
         }
     }
}
  • using recover with goroutines

func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work) 
    }
}
func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            log.Printf(“work failed with %s in %v:”, err, work)
        }
    }()
    do(work)
}

Tasks and Worker Processes

type Pool struct {
    Mu    sync.Mutex
    Tasks []Task
}
func Worker(pool *Pool) { 
    for {
        pool.Mu.Lock()
        // begin critical section:
        task := pool.Tasks[0]            // take the first task
        pool.Tasks = pool.Tasks[1:]      // update the pool
        // end critical section
        pool.Mu.Unlock()
        process(task)
    }
}
func main() {
    pending, done := make(chan *Task), make(chan *Task)
    go sendWork(pending)         // put tasks with work
    for i := 0; i < N; i++ {     // start N goroutines to do
        go Worker(pending, done)
    }
    consumeWork(done)
}
func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}
  • rule - use locking (mutexes) when: caching information in a shared data structure; holding state information;

  • rule - use channels when: communicating asynchronous results; distributing units of work; passing ownership of data;

lazy generator

var resume chan int
func integers() chan int {
    yield := make (chan int)
    count := 0
    go func () {
        for {
            yield <- count
            count++
        }
    } ()
    return yield
}
func generateInteger() int {
    return <-resume
}
func main() {
    resume = integers()
    fmt.Println(generateInteger())     //=> 0
    fmt.Println(generateInteger())     //=> 1
}

Benchmarking goroutines

func main() {
     fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String())
     fmt.Println("buffered",  
     testing.Benchmark(BenchmarkChannelBuffered).String())
}
func BenchmarkChannelSync(b *testing.B) {
     ch := make(chan int)
     go func() {
          for i := 0; i < b.N; i++ {
               ch <- i
          }
          close(ch)
     }()
     for _ = range ch {
     }
}
func BenchmarkChannelBuffered(b *testing.B) {
     ch := make(chan int, 128)
     go func() {
          for i := 0; i < b.N; i++ {
               ch <- i
          }
          close(ch)
     }()
     for _ = range ch {
     }
}
// Output:
// Windows:       N         Time 1 op        Operations per sec
// sync           1000000   2443 ns/op  -->  409 332 / s
// buffered       1000000   4850 ns/op  -->  810 477 / s
  • implement a mutex

/* mutexes */
func (s semaphore) Lock() {
    s.P(1)
}
func (s semaphore) Unlock() {
     s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) { 
     s.P(n)
}
func (s semaphore) Signal() {
    s.V(1)
}



有疑问加站长微信联系(非本文作者)

本文来自:开源中国博客

感谢作者:月光独奏

查看原文:书籍:The Way To Go,第四部分

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2229 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传