golang实现从kafka导入导出一个channel的内容:
- 导出一个channel
package main
import (
"flag"
"fmt"
"log"
"time"
"os"
"strings"
"encoding/binary"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
ab "github.com/hyperledger/fabric/protos/orderer"
)
var (
brokers string
topic string
partition int
)
func main() {
flag.StringVar(&brokers, "brokers", "localhost:9093", "Kafka brokers")
flag.StringVar(&topic, "topic", "topic", "Kafka topic")
flag.IntVar(&partition, "partition", 0, "Kafka topic partition")
flag.Parse()
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(brokers, ","), config)
if err != nil {
log.Fatalf("Unable to create kafka client, error: %v\n", err)
}
err = exportTopic(client, topic, partition)
if err != nil {
log.Fatalf("Unabled to export topic, error: %v\n", err)
}
}
func exportTopic(client sarama.Client, topic string, partition int) error {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return err
}
defer consumer.Close()
//partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
if err != nil {
return err
}
defer partitionConsumer.Close()
file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
var countConnect int64 = 0
var countTimeToCut int64 = 0
var countRegular int64 = 0
var lastOffset int64 = 0
msg := new(ab.KafkaMessage)
for {
select {
case err = <- partitionConsumer.Errors():
return err
case in, ok := <- partitionConsumer.Messages():
if !ok {
return fmt.Errorf("kafka consumer closed")
}
if err := proto.Unmarshal(in.Value, msg); err != nil {
return err
}
// export mssage
lastOffset = in.Offset
if err := exportMessage(file, in.Key, in.Value); err != nil {
return err
}
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
countConnect ++
case *ab.KafkaMessage_TimeToCut:
countTimeToCut ++
case *ab.KafkaMessage_Regular:
countRegular ++
default:
return fmt.Errorf("unknown kafka message")
}
case <- time.After(5 * time.Second):
fmt.Printf("export summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", lastOffset+1, countConnect, countTimeToCut, countRegular)
return nil
}
}
return nil
}
func exportMessage(file *os.File, key []byte, value []byte) error {
if err := exportField(file, key); err != nil {
return err
}
if err := exportField(file, value); err != nil {
return err
}
return nil
}
func exportField(file *os.File, data []byte) error {
l := len(data)
if err := binary.Write(file, binary.LittleEndian, int32(l)); err != nil {
return err
}
if l > 0 {
if n, err := file.Write(data); err != nil {
return err
} else if n != l {
return fmt.Errorf("incorrect bytes written expect %d, but %d", l, n)
}
}
return nil
}
- 导入一个channel
package main
import (
"flag"
"io"
"fmt"
"log"
"os"
"strings"
"encoding/binary"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
ab "github.com/hyperledger/fabric/protos/orderer"
)
var (
brokers string
topic string
)
func main() {
flag.StringVar(&brokers, "brokers", "localhost:9093", "Kafka brokers")
flag.StringVar(&topic, "topic", "topic", "Kafka topic")
flag.Parse()
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(brokers, ","), config)
if err != nil {
log.Fatalf("Unable to create kafka client, error: %v\n", err)
}
err = importTopic(client, topic)
if err != nil {
log.Fatalf("Unabled to export topic, error: %v\n", err)
}
}
func importTopic(client sarama.Client, topic string) error {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return err
}
defer producer.Close()
file, err := os.OpenFile(fmt.Sprintf("%s.dat", topic), os.O_RDONLY, 0644)
if err != nil {
return err
}
defer file.Close()
var countConnect int64 = 0
var countTimeToCut int64 = 0
var countRegular int64 = 0
msg := new(ab.KafkaMessage)
for {
key, value, err := importMessage(file)
if err == io.EOF {
fmt.Printf("import summary total: %d (Connect=%d, TimeToCut=%d, Regular=%d)\n", (countConnect + countTimeToCut + countRegular), countConnect, countTimeToCut, countRegular)
return nil
} else if err != nil {
return err
}
if err := proto.Unmarshal(value, msg); err != nil {
return err
}
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
countConnect ++
case *ab.KafkaMessage_TimeToCut:
countTimeToCut ++
case *ab.KafkaMessage_Regular:
countRegular ++
default:
return fmt.Errorf("unknown kakfa message")
}
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value)}
}
}
func importMessage(file *os.File) ([]byte, []byte, error) {
key, err := importField(file)
if err != nil {
return nil, nil, err
}
value, err := importField(file)
if err == io.EOF {
return nil, nil, fmt.Errorf("invalid EOF meet")
} else if err != nil {
return nil, nil, err
}
return key, value, nil
}
func importField(file *os.File) ([]byte, error) {
var l int32
err := binary.Read(file, binary.LittleEndian, &l)
if err != nil {
return nil, err
}
if l == 0 {
return nil, nil
}
data := make([]byte, l)
if n, err := file.Read(data); err != nil {
return nil, err
} else if int32(n) != l {
return nil, fmt.Errorf("incorrect bytes read expect %d, but %d", l, n)
}
return data, nil
}
有疑问加站长微信联系(非本文作者)