导入和导出kafka based channel的数据

CodingCode · · 438 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

golang实现从kafka导入导出一个channel的内容:

  1. 导出一个channel
package main

import (
    "flag"
    "fmt"
    "log"
    "time"
    "os"
    "strings"
    "encoding/binary"

    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"
 ab "github.com/hyperledger/fabric/protos/orderer"
)

var (
    brokers     string
    topic       string
    partition   int
)

func main() {
    flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")
    flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")
    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")
    flag.Parse()

    config := sarama.NewConfig()
    client, err := sarama.NewClient(strings.Split(brokers, ","), config)
    if err != nil {
        log.Fatalf("Unable to create kafka client, error: %v\n", err)
    }

    err = exportTopic(client, topic, partition)
    if err != nil {
        log.Fatalf("Unabled to export topic, error: %v\n", err)
    }
}

func exportTopic(client sarama.Client, topic string, partition int) error {
    consumer, err := sarama.NewConsumerFromClient(client)
    if err != nil {
        return err
    }
    defer consumer.Close()

  //partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
    if err != nil {
        return err
    }
    defer partitionConsumer.Close()

    file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return err
    }
    defer file.Close()


    var countConnect   int64 = 0
    var countTimeToCut int64 = 0
    var countRegular   int64 = 0
    var lastOffset     int64 = 0
    msg := new(ab.KafkaMessage)
    for {
        select {
        case err = <- partitionConsumer.Errors():
            return err
        case in, ok := <- partitionConsumer.Messages():
            if !ok {
                return fmt.Errorf("kafka consumer closed")
            }
            if err := proto.Unmarshal(in.Value, msg); err != nil {
                return err
            }

            // export mssage
            lastOffset = in.Offset
            if err := exportMessage(file, in.Key, in.Value); err != nil {
                return err
            }

            switch msg.Type.(type) {
            case *ab.KafkaMessage_Connect:
                countConnect ++
            case *ab.KafkaMessage_TimeToCut:
                countTimeToCut ++
            case *ab.KafkaMessage_Regular:
                countRegular ++
            default:
                return fmt.Errorf("unknown kafka message")
            }
        case <- time.After(5 * time.Second):
            fmt.Printf("export summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", lastOffset+1, countConnect, countTimeToCut, countRegular)
            return nil
        }
    }
    return nil
}

func exportMessage(file *os.File, key []byte, value []byte) error {
    if err := exportField(file, key); err != nil {
        return err
    }

    if err := exportField(file, value); err != nil {
        return err
    }

    return nil
}

func exportField(file *os.File, data []byte) error {
    l := len(data)
    if err := binary.Write(file, binary.LittleEndian, int32(l)); err != nil {
        return err
    }

    if l > 0 {
        if n, err := file.Write(data); err != nil {
            return err
        } else if n != l {
            return fmt.Errorf("incorrect bytes written expect %d, but %d", l, n)
        }
    }
    return nil
}
  1. 导入一个channel
package main

import (
    "flag"
    "io"
    "fmt"
    "log"
    "os"
    "strings"
    "encoding/binary"

    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"
 ab "github.com/hyperledger/fabric/protos/orderer"
)

var (
    brokers     string
    topic       string
)

func main() {
    flag.StringVar(&brokers,    "brokers",      "localhost:9093",   "Kafka brokers")
    flag.StringVar(&topic,      "topic",        "topic",            "Kafka topic")
    flag.Parse()

    config := sarama.NewConfig()
    client, err := sarama.NewClient(strings.Split(brokers, ","), config)
    if err != nil {
        log.Fatalf("Unable to create kafka client, error: %v\n", err)
    }

    err = importTopic(client, topic)
    if err != nil {
        log.Fatalf("Unabled to export topic, error: %v\n", err)
    }
}

func importTopic(client sarama.Client, topic string) error {
    producer, err := sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        return err
    }
    defer producer.Close()

    file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_RDONLY, 0644)
    if err != nil {
        return err
    }
    defer file.Close()

    var countConnect   int64 = 0
    var countTimeToCut int64 = 0
    var countRegular   int64 = 0
    msg := new(ab.KafkaMessage)
    for {
        key, value, err := importMessage(file)
        if err == io.EOF {
            fmt.Printf("import summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", (countConnect + countTimeToCut + countRegular), countConnect, countTimeToCut, countRegular)
            return nil
        } else if err != nil {
            return err
        }

        if err := proto.Unmarshal(value, msg); err != nil {
            return err
        }

        switch msg.Type.(type) {
        case *ab.KafkaMessage_Connect:
            countConnect ++
        case *ab.KafkaMessage_TimeToCut:
            countTimeToCut ++
        case *ab.KafkaMessage_Regular:
            countRegular ++
        default:
            return fmt.Errorf("unknown kakfa message")
        }

        producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value)}
    }
}

func importMessage(file *os.File) ([]byte, []byte, error) {
    key, err := importField(file)
    if err != nil {
        return nil, nil, err
    }

    value, err := importField(file)
    if err == io.EOF {
        return nil, nil, fmt.Errorf("invalid EOF meet")
    } else if err != nil {
        return nil, nil, err
    }

    return key, value, nil
}


func importField(file *os.File) ([]byte, error) {
    var l int32
    err := binary.Read(file, binary.LittleEndian, &l)
    if err != nil {
        return nil, err
    }

    if l == 0 {
        return nil, nil
    }

    data := make([]byte, l)
    if n, err := file.Read(data); err != nil {
        return nil, err
    } else if int32(n) != l {
        return nil, fmt.Errorf("incorrect bytes read expect %d, but %d", l, n)
    }

    return data, nil
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:CodingCode

查看原文:导入和导出kafka based channel的数据

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

438 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传