go sarama kafka 使用异步生产者导致to many file的问题

zhangcm · · 2002 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

//初始化的地方,生成一个全局的异步生产者 func init(){ config := sarama.NewConfig() ////等待服务器所有副本都保存成功后的响应 //config.Producer.RequiredAcks = sarama.WaitForAll //随机向partition发送消息 config.ChannelBufferSize = 512 config.Producer.Flush.Messages = 50000 config.Net.MaxOpenRequests = 2000 config.Producer.Partitioner = sarama.NewRandomPartitioner //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. //config.Producer.Return.Successes = true config.Producer.Return.Errors = true //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息 config.Version = sarama.V0_10_0_1 //使用配置,新建一个异步生产者 producer ,_ = sarama.NewAsyncProducer([]string{“******”}, config) logger.Infof("huawei-Rece new Async kafka producer success : %s ",producer) } func handleReceipt(request *http.Request, result []byte) error { var err error if request.Method == "POST" { logger.Infof("receive post request Message: %s ; remoteAddr: %s ", result, request.RemoteAddr) //异步写到kafka message := &sarama.ProducerMessage{ Topic: "huaweipushreceipt", Value: sarama.ByteEncoder(‘ok’)} producer.Input() <- message } return err } //使用的地方,这个方法会接受回调请求,量很大,将每个请求会打到kafka,但是运行一段时间就会报错: // http: Accept error: accept tcp [::]:8080: accept4: too many open files; retrying in 1s //之前没有写kafka操作就没事,有没有同学知道怎么回事呀,是不是我的生产端配置的问题

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2002 次点击  
加入收藏 微博
2 回复  |  直到 2019-11-20 21:50:19
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传