为什么数据存入队列的顺序和发送到kafka后得到反馈的信息顺序不同???
```
package main
import (
"fmt"
"log"
"math/rand"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
var (
conf kafka.ConfigMap
p *kafka.Producer
ERR *log.Logger
)
func initConf() {
conf = kafka.ConfigMap{"bootstrap.servers": "192.168.2.60",
"go.batch.producer": true, ///是否批量
"go.delivery.reports": true, ///当使用async模式时,在producer必须被阻塞或者数据必须丢失之前,可以缓存到队列中的未发送的最大消息条数
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 5, ///async模式时,用户缓存数据的最大时间间隔
"api.version.request": "true",
"broker.version.fallback": "0.9.0.1",
"socket.blocking.max.ms": 5,
"message.timeout.ms": 100, //本地等待时间
"default.topic.config": kafka.ConfigMap{"acks": 0, "request.required.acks": 0, "message.timeout.ms": 50}}
}
func initKafka() { //初始化
initConf()
var err error
p, err = kafka.NewProducer(&conf)
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
fmt.Println("initKafka()", p, err)
fmt.Printf("Created Producer %v\n", p)
}
func judge(s1 []byte, s2 []byte) bool {
if len(s1) != len(s2) {
return false
}
for i := 0; i < len(s1); i++ {
if s1[i] != s2[i] {
return false
}
}
return true
}
func show() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
deliveryChan := make(chan kafka.Event)
//str := []string{"one", "two", "three", "four", "five"}
topic := "Test"
tmp := `channel_id": `
var err error
num := 100
for i := 0; i < num; i++ {
value := tmp + fmt.Sprintln(i)
// topic = str[0]
PartitionNum := r.Intn(0)
err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: int32(PartitionNum)}, Value: []byte(value)}, deliveryChan) //存入kafka c库的队列中
if err != nil {
fmt.Println("p.Produce() err", i, err)
//break
}
}
for i := 0; i < num; i++ {
tmp1 := tmp + fmt.Sprintln(i)
e := <-deliveryChan //得到反馈信息
m := e.(*kafka.Message)
if !judge([]byte(tmp1), m.Value) {
ERR.Println(tmp1, "!=", string(m.Value))
}
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} // else {
//fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
// *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
//}
//fmt.Println("TopicPartition", m.TopicPartition)
//fmt.Println("Value", string(m.Value))
//fmt.Println("Key", m.Key)
//fmt.Println("Timestamp", m.Timestamp)
//fmt.Println("TimestampType", m.TimestampType)
//fmt.Println("Opaque", m.Opaque)
}
close(deliveryChan)
}
func init() {
initKafka()
}
func main() {
//filename, err := os.Create("webLog.log")
filename, err := os.OpenFile("error.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0664)
if err != nil {
fmt.Println(err)
}
ERR = log.New(filename, "[error]", log.LstdFlags|log.Lmicroseconds)
show()
}
```
有疑问加站长微信联系(非本文作者)