慎重声明,只代表本人观点,不一定代表实际。看了差不多半天Spring Cloud Stream中的kafka源代码,差不多断断续续折腾了一个月,终于在golang中使用kafka发送给Spring Cloud Stream并且成功处理
Spring Cloud Stream当使用@StreamListener中的condition,通过head进行选择的时候,其中MessageHeader是需要包含三个信息:
id UUID类型
contentType 字符串类型,内容类型,可以为:application/json
spring_json_header_types header中的值类型,使用golang的时候,例如:{"partitionKey":"java.lang.String","scst_partition":"java.lang.Integer","contentType":"java.lang.String"}
注意id必须基于java序列化格式,可以参考:https://www.jianshu.com/p/08fe6ffe26d5
直接上代码:
func testKafka() {
sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.MaxVersion
client, err := sarama.NewClient(strings.Split("localhost:9092", ","), config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("unable to create producer: %q", err)
}
defer producer.Close()
serviceErrorLog := &ServiceErrorLog{ApplicationName: "test-service", ServerIp: "127.0.0.1", Path: "/", QueryParams: "/QueryParams", Message: "Message", Trace: "Trace", LogTime: time.Now().Format(DATE_TIME_PATTERN)}
if err == nil {
key, err1 := uuid.NewRandom()
if err1 != nil {
log.Fatalf("unable to create uuid: %q", err1)
}
//headers := &MessageHeader{Id: key.String(), ContentType: "application/json", PartitionKey: "service-error-logs", Timestamp: time.Now().UnixNano()}
//genericMessage := &ServiceErrorLogGenericMessage{Headers: *headers, Payload: *serviceErrorLog}
text, _ := json.Marshal(serviceErrorLog)
id := utility.UUIDJavaBytes(key)
contentType := []byte("application/json")
partitionKey := []byte("service-error-logs")
springJsonHeaderTypes := []byte{123, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 75, 101, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 115, 99, 115, 116, 95, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125}
message := &sarama.ProducerMessage{
Topic: "log-service-topic",
Headers: []sarama.RecordHeader{
{Key: []byte("id"), Value: id},
{Key: []byte("contentType"), Value: contentType},
{Key: []byte("partitionKey"), Value: partitionKey},
{Key: []byte("spring_json_header_types"), Value: springJsonHeaderTypes},
},
Value: sarama.StringEncoder(text)}
fmt.Println(message)
partition, offset, err := producer.SendMessage(message)
fmt.Println(partition, offset, err)
}
}
有疑问加站长微信联系(非本文作者)