原生支持并发是Go语言最强大的特性,比如channels和goroutines。但是对于Go语言初学者尝试并接受并发概念并不是很轻松。
Go团队发布的第一个 goroutines 的管理工具是 sync.WaitGroup,WaitGroup会阻塞直到指定数量的goroutine已经完成执行,这是文档中的一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Increment the WaitGroup counter. wg.Add(1) // Launch a goroutine to fetch the URL. go func(url string) { // Decrement the counter when the goroutine completes. defer wg.Done() // Fetch the URL. http.Get(url) }(url) } // Wait for all HTTP fetches to complete. wg.Wait() |
WaitGroup 使你在处理并发任务时对goroutines的创建和停止的数量控制变得简单。每次你创建一个goroutine的时候只要调用Add()方法就可以了。当这个任务结束调用wg.Done()。等待所有的任务完成,调用 wg.Wait()。但是用 WatiGroup唯一的问题就是当你的goroutines出错时,你并不能知道出错的原因。
Extending sync.WaitGroup’s functionality
最近,Go团队在实验仓库中添加了一个名为sync.ErrGroup的新软件包。 sync.ErrGroup再sync.WaitGroup功能的基础上,增加了错误传递,以及在发生不可恢复的错误时取消整个goroutine集合,或者等待超时。我们同样来看一个示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
var g errgroup.Group var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Launch a goroutine to fetch the URL. url := url // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { // Fetch the URL. resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } // Wait for all HTTP fetches to complete. if err := g.Wait(); err == nil { fmt.Println("Successfully fetched all URLs.") } |
g.Go()方法不仅允许你传一个匿名的函数,而且还能捕获错误信息,你只要像这样返回一个错误 return err,这使开发者使用goroutines时开发效率显著提高。
为了测试sync.ErrGroup的所有功能,我写了一个小程序,用一个指定的模式递归搜索目录中的Go文件。这有助于在Go源代码树中查找已使用已弃用或更新的包的实例。要测试sync.ErrGroup的所有功能,我还为应用程序添加了超时设置在功能。 如果达到时间限制,所有搜索和处理goroutine将被取消,程序将结束
当程序运行时,它会生成以下结果:
1 2 3 4 |
$ gogrep -timeout 1000ms . fmt gogrep.go 1 hits |
如果你使用的参数不正确的话,你会看到下面的输出
1 2 3 4 5 6 7 |
gogrep by Brian Ketelsen Flags: -timeout duration timeout in milliseconds (default 500ms) Usage: gogrep [flags] path pattern |
How sync.ErrGroup makes application building easier
让我们看看我们是如何利用 sync.ErrGroup 来使程序写的更简单。我们将从一个 main() 函数开始。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
package main import ( "bytes" "flag" "fmt" "io/ioutil" "log" "os" "path/filepath" "strings" "time" "golang.org/x/net/context" "golang.org/x/sync/errgroup" ) func main() { duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds") flag.Usage = func() { fmt.Printf("%s by Brian Ketelsen\n", os.Args[0]) fmt.Println("Usage:") fmt.Printf(" gogrep [flags] path pattern \n") fmt.Println("Flags:") flag.PrintDefaults() } flag.Parse() if flag.NArg() != 2 { flag.Usage() os.Exit(-1) } path := flag.Arg(0) pattern := flag.Arg(1) ctx, _ := context.WithTimeout(context.Background(), *duration) m, err := search(ctx, path, pattern) if err != nil { log.Fatal(err) } for _, name := range m { fmt.Println(name) } fmt.Println(len(m), "hits") } func search(ctx context.Context, root string, pattern string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) paths := make(chan string, 100) // get all the paths g.Go(func() error { defer close(paths) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") { return nil } select { case paths <- path: case <-ctx.Done(): return ctx.Err() } return nil }) }) c := make(chan string, 100) for path := range paths { p := path g.Go(func() error { data, err := ioutil.ReadFile(p) if err != nil { return err } if !bytes.Contains(data, []byte(pattern)) { return nil } select { case c <- p: case <-ctx.Done(): return ctx.Err() } return nil }) } go func() { g.Wait() close(c) }() var m []string for r := range c { m = append(m, r) } return m, g.Wait() } |
在这行代码以前,是对命令行传进来的参数做了解析。这行以后才是我们真正感兴趣的代码。
1 2 |
ctx, _ := context.WithTimeout(context.Background(), *duration) |
我创建了一个context.Context并且为它添加了超时设置,当超时时间到了,”ctx”将接收到channel的超时警告。WithTimeout同样也会返回一个取消的方法,但是我们不需要,所以用 “_” 来忽略掉了。
下面的search() 方法的有context,search path,和 search pattern。最后把找到的文件和数量输出。
参考文献:https://www.oreilly.com/learning/run-strikingly-fast-parallel-file-searches-in-go-with-sync-errgroup
https://godoc.org/golang.org/x/sync/errgroup#example-Group–Parallel
有疑问加站长微信联系(非本文作者)