golang如何使用Spring Cloud Stream

EasyNetCN · · 635 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

慎重声明,只代表本人观点,不一定代表实际。看了差不多半天Spring Cloud Stream中的kafka源代码,差不多断断续续折腾了一个月,终于在golang中使用kafka发送给Spring Cloud Stream并且成功处理

Spring Cloud Stream当使用@StreamListener中的condition,通过head进行选择的时候,其中MessageHeader是需要包含三个信息:

id UUID类型

contentType 字符串类型,内容类型,可以为:application/json

spring_json_header_types header中的值类型,使用golang的时候,例如:{"partitionKey":"java.lang.String","scst_partition":"java.lang.Integer","contentType":"java.lang.String"}

注意id必须基于java序列化格式,可以参考:https://www.jianshu.com/p/08fe6ffe26d5

直接上代码:

func testKafka() {
    sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)

    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.MaxVersion

    client, err := sarama.NewClient(strings.Split("localhost:9092", ","), config)

    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewSyncProducerFromClient(client)

    if err != nil {
        log.Fatalf("unable to create producer: %q", err)
    }

    defer producer.Close()

    serviceErrorLog := &ServiceErrorLog{ApplicationName: "test-service", ServerIp: "127.0.0.1", Path: "/", QueryParams: "/QueryParams", Message: "Message", Trace: "Trace", LogTime: time.Now().Format(DATE_TIME_PATTERN)}

    if err == nil {
        key, err1 := uuid.NewRandom()

        if err1 != nil {
            log.Fatalf("unable to create uuid: %q", err1)
        }

        //headers := &MessageHeader{Id: key.String(), ContentType: "application/json", PartitionKey: "service-error-logs", Timestamp: time.Now().UnixNano()}

        //genericMessage := &ServiceErrorLogGenericMessage{Headers: *headers, Payload: *serviceErrorLog}

        text, _ := json.Marshal(serviceErrorLog)

        id := utility.UUIDJavaBytes(key)
        contentType := []byte("application/json")
        partitionKey := []byte("service-error-logs")
        springJsonHeaderTypes := []byte{123, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 75, 101, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 115, 99, 115, 116, 95, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125}

        message := &sarama.ProducerMessage{
            Topic: "log-service-topic",
            Headers: []sarama.RecordHeader{
                {Key: []byte("id"), Value: id},
                {Key: []byte("contentType"), Value: contentType},
                {Key: []byte("partitionKey"), Value: partitionKey},
                {Key: []byte("spring_json_header_types"), Value: springJsonHeaderTypes},
            },
            Value: sarama.StringEncoder(text)}

        fmt.Println(message)

        partition, offset, err := producer.SendMessage(message)

        fmt.Println(partition, offset, err)

    }

}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:EasyNetCN

查看原文:golang如何使用Spring Cloud Stream

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

635 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传