//初始化的地方,生成一个全局的异步生产者
func init(){
config := sarama.NewConfig()
////等待服务器所有副本都保存成功后的响应
//config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.ChannelBufferSize = 512
config.Producer.Flush.Messages = 50000
config.Net.MaxOpenRequests = 2000
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
//config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
//使用配置,新建一个异步生产者
producer ,_ = sarama.NewAsyncProducer([]string{“******”}, config)
logger.Infof("huawei-Rece new Async kafka producer success : %s ",producer)
}
func handleReceipt(request *http.Request, result []byte) error {
var err error
if request.Method == "POST" {
logger.Infof("receive post request Message: %s ; remoteAddr: %s ", result, request.RemoteAddr)
//异步写到kafka
message := &sarama.ProducerMessage{
Topic: "huaweipushreceipt",
Value: sarama.ByteEncoder(‘ok’)}
producer.Input() <- message
}
return err
}
//使用的地方,这个方法会接受回调请求,量很大,将每个请求会打到kafka,但是运行一段时间就会报错:
// http: Accept error: accept tcp [::]:8080: accept4: too many open files; retrying in 1s
//之前没有写kafka操作就没事,有没有同学知道怎么回事呀,是不是我的生产端配置的问题
这个错误是,已经达到系统限定的最大同时打开文件数量,,,不知道这个数量可不可以改,,,可以改的话,改大一点,治标不治本,,
最好是用带指定容量的 channel 通信,来异步写入,,,这样就可以避免打开文件数量过多导致这个错误,,但是请求太多时,后面的写入会阻塞,,
PS:我没实践过 go,上面的东西,是我从书上看来的,,,,,,,,,所以代码具体怎么写,,,我不知道,,,
#2