这一段代码有何地方需要修改

GeorgeYuen · · 811 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

```go package main import ( "bufio" "fmt" "github.com/Shopify/sarama" "io" "os/exec" "strings" ) var ( asyncProducer *sarama.AsyncProducer ) func main() { execCommand() } func execCommand() { cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties") fmt.Println(cmd.Args) stdout, err := cmd.StdoutPipe() if err != nil { fmt.Println(err) } stderr, err := cmd.StderrPipe() if err != nil { fmt.Println(err) } cmd.Start() go printLog(stdout) go printLog(stderr) cmd.Wait() } func printLog(readCloser io.ReadCloser) { reader := bufio.NewReader(readCloser) for { line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Print(line) go sendMessage(line) } } func gerProducer() sarama.AsyncProducer { if asyncProducer != nil { return *asyncProducer } else { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config) if err != nil { fmt.Println("kafka failed") } asyncProducer = &p return *asyncProducer } } func sendMessage(message string) { msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.ByteEncoder(message), } p := gerProducer() go func(p sarama.AsyncProducer) { errors := p.Errors() success := p.Successes() for { select { case err := <-errors: if err != nil { fmt.Println("kafka error") } case <-success: } } }(p) p.Input() <- msg } ```

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

811 次点击  
加入收藏 微博
2 回复  |  直到 2017-07-17 14:57:58
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传