golang客户端sarama通过SSL连接Kafka配置

copy202 · · 5149 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

  • 前言

非对称密钥、证书、签名、keystone、truststore等相关概念请移步度娘查询,在此只记录下相关步骤

  • 证书生成
#!/bin/sh
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

keytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
  • 服务端配置

通过上面执行的脚本,Kafka的broker使用kafka.server.truststore.jks和kafka.server.keystore.jks,修改配置文件server.properties

 listeners=PLAINTEXT://x.x.x.x:9092,SSL://x.x.x.x:9093
 ssl.keystore.location=kafka.server.keystore.jks
 ssl.keystore.password=123456
 ssl.key.password=123456
 ssl.truststore.location=kafka.server.truststore.jks
 ssl.truststore.password=123456
 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
 ssl.keystore.type=JKS
 ssl.truststore.type=JKS
  • 客户端配置

客户端会用到上面产生的server.cer.pem,client.cer.pem(重点:需要修改),client.key.pem client.cer.pem,client.key.pem可能出现"private key does not match public key"错误,那么需要手动处理client.cer.pem文件

Bag Attributes
    friendlyName: caroot
    2.16.840.1.xxx.xxx.1.1: <Unsupported tag 6>
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNV
BAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQsw
CQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAx
OVoXDTI4MDMxNjEyMTAxOVowW9....省略n个字符
-----END CERTIFICATE-----
Bag Attributes
    friendlyName: localhost
    localKeyID: 54 69 6D 65 20 31 35 32 31 34 36 31 34 34 36 xx xx xx
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj
bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE
CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y
ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE
BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2
OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符
-----END CERTIFICATE-----
Bag Attributes
    friendlyName: CN=192.168.xx.xx,OU=xx,O=xx,L=bj,ST=bj,C=cn
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNV
BAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQsw
CQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAx
OVoXDTI4MDMxNjEyMTAxOVowWDELMAkGA1UEBhMCY24xCzAJBgNVBAgMAmJqMQsw
CQYDVQQHDAJiajELMAkGA1UECg....省略n个字符
-----END CERTIFICATE-----

此处查看到有三段-----BEGIN CERTIFICATE----- 和 -----END CERTIFICATE-----,打开client.key.pem看到只有一段friendlyName: localhost,那么找到client.cer.pem(上图为中间一段),删除其余部分,剩余如下:

-----BEGIN CERTIFICATE-----
MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj
bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE
CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y
ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE
BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2
OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符
-----END CERTIFICATE-----
  • 测试程序如下:
package main

// Run with:
// go build examples/base-client/*.go
// ./base-client

import (
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"log"
	"os"
	"os/signal"
	"sync"
	"github.com/Shopify/sarama"
)

func main() {
	tlsConfig, err := NewTLSConfig("bundle/client.cer.pem",
		"bundle/client.key.pem",
		"bundle/server.cer.pem")
	if err != nil {
		log.Fatal(err)
	}
	// This can be used on test server if domain does not match cert:
	tlsConfig.InsecureSkipVerify = true

	consumerConfig := sarama.NewConfig()
	consumerConfig.Net.TLS.Enable = true
	consumerConfig.Net.TLS.Config = tlsConfig

	client, err := sarama.NewClient([]string{"192.168.2.31:9093"}, consumerConfig)
	if err != nil {
		log.Fatalf("unable to create kafka client: %q", err)
	}

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	consumerLoop(consumer, "Test")
}

// NewTLSConfig generates a TLS configuration used to authenticate on server with
// certificates.
// Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
	tlsConfig := tls.Config{}

	// Load client cert
	cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
	if err != nil {
		return &tlsConfig, err
	}
	tlsConfig.Certificates = []tls.Certificate{cert}

	// Load CA cert
	caCert, err := ioutil.ReadFile(caCertFile)
	if err != nil {
		return &tlsConfig, err
	}
	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)
	tlsConfig.RootCAs = caCertPool

	tlsConfig.BuildNameToCertificate()
	return &tlsConfig, err
}

func consumerLoop(consumer sarama.Consumer, topic string) {
	partitions, err := consumer.Partitions(topic)
	if err != nil {
		log.Println("unable to fetch partition IDs for the topic", topic, err)
		return
	}

	// Trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var wg sync.WaitGroup
	for partition := range partitions {
		wg.Add(1)
		go func() {
			consumePartition(consumer, int32(partition), signals)
			wg.Done()
		}()
	}
	wg.Wait()
}

func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) {
	log.Println("Receving on partition", partition)
	partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)
	if err != nil {
		log.Println(err)
		return
	}
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Println(err)
		}
	}()

	consumed := 0
ConsumerLoop:
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)
			consumed++
		case <-signals:
			break ConsumerLoop
		}
	}
	log.Printf("Consumed: %d\n", consumed)
}

按照网上的教程出现各种错误,主要在于client.cer.pem需要手动修改

参考地址:

https://docs.confluent.io/current/tutorials/security_tutorial.html#creating-ssl-keys-and-certificates

https://medium.com/processone/using-tls-authentication-for-your-go-kafka-client-3c5841f2a625

https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go


有疑问加站长微信联系(非本文作者)

本文来自:开源中国博客

感谢作者:copy202

查看原文:golang客户端sarama通过SSL连接Kafka配置

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

5149 次点击  
加入收藏 微博
1 回复  |  直到 2018-08-24 21:20:18
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传