confluent-kafka-go 批量发送到kafka的问题后反馈信息顺序问题

zhaohao · · 3206 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

为什么数据存入队列的顺序和发送到kafka后得到反馈的信息顺序不同??? ``` package main import ( "fmt" "log" "math/rand" "os" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( conf kafka.ConfigMap p *kafka.Producer ERR *log.Logger ) func initConf() { conf = kafka.ConfigMap{"bootstrap.servers": "192.168.2.60", "go.batch.producer": true, ///是否批量 "go.delivery.reports": true, ///当使用async模式时,在producer必须被阻塞或者数据必须丢失之前,可以缓存到队列中的未发送的最大消息条数 "queue.buffering.max.messages": 100000, "queue.buffering.max.ms": 5, ///async模式时,用户缓存数据的最大时间间隔 "api.version.request": "true", "broker.version.fallback": "0.9.0.1", "socket.blocking.max.ms": 5, "message.timeout.ms": 100, //本地等待时间 "default.topic.config": kafka.ConfigMap{"acks": 0, "request.required.acks": 0, "message.timeout.ms": 50}} } func initKafka() { //初始化 initConf() var err error p, err = kafka.NewProducer(&conf) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) os.Exit(1) } fmt.Println("initKafka()", p, err) fmt.Printf("Created Producer %v\n", p) } func judge(s1 []byte, s2 []byte) bool { if len(s1) != len(s2) { return false } for i := 0; i < len(s1); i++ { if s1[i] != s2[i] { return false } } return true } func show() { r := rand.New(rand.NewSource(time.Now().UnixNano())) deliveryChan := make(chan kafka.Event) //str := []string{"one", "two", "three", "four", "five"} topic := "Test" tmp := `channel_id": ` var err error num := 100 for i := 0; i < num; i++ { value := tmp + fmt.Sprintln(i) // topic = str[0] PartitionNum := r.Intn(0) err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: int32(PartitionNum)}, Value: []byte(value)}, deliveryChan) //存入kafka c库的队列中 if err != nil { fmt.Println("p.Produce() err", i, err) //break } } for i := 0; i < num; i++ { tmp1 := tmp + fmt.Sprintln(i) e := <-deliveryChan //得到反馈信息 m := e.(*kafka.Message) if !judge([]byte(tmp1), m.Value) { ERR.Println(tmp1, "!=", string(m.Value)) } if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) } // else { //fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", // *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) //} //fmt.Println("TopicPartition", m.TopicPartition) //fmt.Println("Value", string(m.Value)) //fmt.Println("Key", m.Key) //fmt.Println("Timestamp", m.Timestamp) //fmt.Println("TimestampType", m.TimestampType) //fmt.Println("Opaque", m.Opaque) } close(deliveryChan) } func init() { initKafka() } func main() { //filename, err := os.Create("webLog.log") filename, err := os.OpenFile("error.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0664) if err != nil { fmt.Println(err) } ERR = log.New(filename, "[error]", log.LstdFlags|log.Lmicroseconds) show() } ```

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3206 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传