Channels
var ch1 chan string ch1 = make(chan string) ch1 := make(chan string) buf := 100 ch1 := make(chan string, buf) chanOfChans := make(chan chan int) funcChan := chan func()
func main() { ch := make(chan string) go sendData(ch) go getData(ch) time.Sleep(1e9) } func sendData(ch chan string) { ch <- “Washington” ch <- “Tripoli” ch <- “London” } func getData(ch chan string) { var input string for { input = <-ch; fmt.Printf("%s ", input) } }
Semaphore pattern
type Empty interface {} var empty Empty ... data := make([]float64, N) res := make([]float64, N) sem := make(chan Empty, N) // semaphore ... for i, xi := range data { go func (i int, xi float64) { res[i] = doSomething(i,xi) sem <- empty } (i, xi) } for i := 0; i < N; i++ { // wait for goroutines to finish <-sem }
Channel Factory pattern
func main() { stream := pump() go suck(stream) // shortened : go suck( pump() ) time.Sleep(1e9) } func pump() chan int { ch := make(chan int) go func() { for i := 0; ; i++ { ch <- i } }() return ch } func suck(ch chan int) { for { fmt.Println(<-ch) } } func suck(ch chan int) { go func() { for v := range ch { fmt.Println(v) } }() }
Channel Directionality
// channel can only receive data and cannot be closed var send_only chan<- int var recv_only <-chan int // channel can only send data ... var c = make(chan int) // bidirectional go source(c) go sink(c) func source(ch chan<- int) { for { ch <- 1 } } func sink(ch <-chan int) { for { <-ch } } ... // closing a channel func sendData(ch chan string) { ch <- "Washington" ch <- "Tripoli" ch <- "London" ch <- "Beijing" ch <- "Tokio" close(ch) } func getData(ch chan string) { for { input, open := <-ch if !open { break } fmt.Printf("%s ", input) } }
Switching between goroutines with select
select { case u:= <- ch1: ... case v:= <- ch2: ... default: // no value ready to be received ... }
|
channels with timeouts and tickers
// func Tick(d Duration) <-chan Time import "time" rate_per_sec := 10 var dur Duration = 1e8 // rate_per_sec chRate := time.Tick(dur) // every 1/10th of a second for req := range requests { <- chRate // rate limit our Service.Method RPC calls go client.Call("Service.Method", req, ...) }
// func After(d Duration) <-chan Time func main() { tick := time.Tick(1e8) boom := time.After(5e8) for { select { case <-tick: fmt.Println(“tick.”) case <-boom: fmt.Println(“BOOM!”) return default: fmt.Println(“ .”) time.Sleep(5e7) } } }
using recover with goroutines
func server(workChan <-chan *Work) { for work := range workChan { go safelyDo(work) } } func safelyDo(work *Work) { defer func() { if err := recover(); err != nil { log.Printf(“work failed with %s in %v:”, err, work) } }() do(work) }
Tasks and Worker Processes
type Pool struct { Mu sync.Mutex Tasks []Task } func Worker(pool *Pool) { for { pool.Mu.Lock() // begin critical section: task := pool.Tasks[0] // take the first task pool.Tasks = pool.Tasks[1:] // update the pool // end critical section pool.Mu.Unlock() process(task) } }
func main() { pending, done := make(chan *Task), make(chan *Task) go sendWork(pending) // put tasks with work for i := 0; i < N; i++ { // start N goroutines to do go Worker(pending, done) } consumeWork(done) } func Worker(in, out chan *Task) { for { t := <-in process(t) out <- t } }
rule - use locking (mutexes) when: caching information in a shared data structure; holding state information;
rule - use channels when: communicating asynchronous results; distributing units of work; passing ownership of data;
lazy generator
var resume chan int func integers() chan int { yield := make (chan int) count := 0 go func () { for { yield <- count count++ } } () return yield } func generateInteger() int { return <-resume } func main() { resume = integers() fmt.Println(generateInteger()) //=> 0 fmt.Println(generateInteger()) //=> 1 }
Benchmarking goroutines
func main() { fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String()) fmt.Println("buffered", testing.Benchmark(BenchmarkChannelBuffered).String()) } func BenchmarkChannelSync(b *testing.B) { ch := make(chan int) go func() { for i := 0; i < b.N; i++ { ch <- i } close(ch) }() for _ = range ch { } } func BenchmarkChannelBuffered(b *testing.B) { ch := make(chan int, 128) go func() { for i := 0; i < b.N; i++ { ch <- i } close(ch) }() for _ = range ch { } } // Output: // Windows: N Time 1 op Operations per sec // sync 1000000 2443 ns/op --> 409 332 / s // buffered 1000000 4850 ns/op --> 810 477 / s
implement a mutex
/* mutexes */ func (s semaphore) Lock() { s.P(1) } func (s semaphore) Unlock() { s.V(1) } /* signal-wait */ func (s semaphore) Wait(n int) { s.P(n) } func (s semaphore) Signal() { s.V(1) }
有疑问加站长微信联系(非本文作者)