confluent-kafka-go 批量发送到kafka的问题后反馈信息顺序问题

zhaohao · 2017-08-15 08:57:53 · 3990 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2017-08-15 08:57:53 的主题,其中的信息可能已经有所发展或是发生改变。

为什么数据存入队列的顺序和发送到kafka后得到反馈的信息顺序不同???

package main

import (
    "fmt"
    "log"
    "math/rand"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
    conf kafka.ConfigMap
    p    *kafka.Producer
    ERR  *log.Logger
)

func initConf() {
    conf = kafka.ConfigMap{"bootstrap.servers": "192.168.2.60",
        "go.batch.producer":            true, ///是否批量
        "go.delivery.reports":          true, ///当使用async模式时,在producer必须被阻塞或者数据必须丢失之前,可以缓存到队列中的未发送的最大消息条数
        "queue.buffering.max.messages": 100000,
        "queue.buffering.max.ms":       5, ///async模式时,用户缓存数据的最大时间间隔
        "api.version.request":          "true",
        "broker.version.fallback":      "0.9.0.1",
        "socket.blocking.max.ms":       5,
        "message.timeout.ms":           100, //本地等待时间
        "default.topic.config":         kafka.ConfigMap{"acks": 0, "request.required.acks": 0, "message.timeout.ms": 50}}
}

func initKafka() { //初始化
    initConf()

    var err error
    p, err = kafka.NewProducer(&conf)
    if err != nil {
        fmt.Printf("Failed to create producer: %s\n", err)
        os.Exit(1)
    }
    fmt.Println("initKafka()", p, err)
    fmt.Printf("Created Producer %v\n", p)
}

func judge(s1 []byte, s2 []byte) bool {
    if len(s1) != len(s2) {
        return false
    }
    for i := 0; i < len(s1); i++ {
        if s1[i] != s2[i] {
            return false
        }
    }
    return true
}

func show() {
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    deliveryChan := make(chan kafka.Event)
    //str := []string{"one", "two", "three", "four", "five"}
    topic := "Test"
    tmp := `channel_id": `
    var err error
    num := 100
    for i := 0; i < num; i++ {
        value := tmp + fmt.Sprintln(i)
    //    topic = str[0]
        PartitionNum := r.Intn(0)
        err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: int32(PartitionNum)}, Value: []byte(value)}, deliveryChan) //存入kafka c库的队列中
        if err != nil {
            fmt.Println("p.Produce() err", i, err)
            //break
        }
    }
    for i := 0; i < num; i++ {
        tmp1 := tmp + fmt.Sprintln(i)
        e := <-deliveryChan //得到反馈信息
        m := e.(*kafka.Message)
        if !judge([]byte(tmp1), m.Value) {
            ERR.Println(tmp1, "!=", string(m.Value))
        }
        if m.TopicPartition.Error != nil {
            fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
        } // else {
        //fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
        //    *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
        //}

        //fmt.Println("TopicPartition", m.TopicPartition)
        //fmt.Println("Value", string(m.Value))
        //fmt.Println("Key", m.Key)
        //fmt.Println("Timestamp", m.Timestamp)
        //fmt.Println("TimestampType", m.TimestampType)
        //fmt.Println("Opaque", m.Opaque)

    }
    close(deliveryChan)
}

func init() {
    initKafka()
}

func main() {
    //filename, err := os.Create("webLog.log")
    filename, err := os.OpenFile("error.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0664)
    if err != nil {
        fmt.Println(err)
    }
    ERR = log.New(filename, "[error]", log.LstdFlags|log.Lmicroseconds)
    show()

}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3990 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传