- 前言
非对称密钥、证书、签名、keystone、truststore等相关概念请移步度娘查询,在此只记录下相关步骤
- 证书生成
#!/bin/sh
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
- 服务端配置
通过上面执行的脚本,Kafka的broker使用kafka.server.truststore.jks和kafka.server.keystore.jks,修改配置文件server.properties
listeners=PLAINTEXT://x.x.x.x:9092,SSL://x.x.x.x:9093
ssl.keystore.location=kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
- 客户端配置
客户端会用到上面产生的server.cer.pem,client.cer.pem(重点:需要修改),client.key.pem client.cer.pem,client.key.pem可能出现"private key does not match public key"错误,那么需要手动处理client.cer.pem文件
Bag Attributes
friendlyName: caroot
2.16.840.1.xxx.xxx.1.1: <Unsupported tag 6>
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNV
BAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQsw
CQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAx
OVoXDTI4MDMxNjEyMTAxOVowW9....省略n个字符
-----END CERTIFICATE-----
Bag Attributes
friendlyName: localhost
localKeyID: 54 69 6D 65 20 31 35 32 31 34 36 31 34 34 36 xx xx xx
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj
bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE
CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y
ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE
BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2
OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符
-----END CERTIFICATE-----
Bag Attributes
friendlyName: CN=192.168.xx.xx,OU=xx,O=xx,L=bj,ST=bj,C=cn
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
issuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx
-----BEGIN CERTIFICATE-----
MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNV
BAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQsw
CQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAx
OVoXDTI4MDMxNjEyMTAxOVowWDELMAkGA1UEBhMCY24xCzAJBgNVBAgMAmJqMQsw
CQYDVQQHDAJiajELMAkGA1UECg....省略n个字符
-----END CERTIFICATE-----
此处查看到有三段-----BEGIN CERTIFICATE----- 和 -----END CERTIFICATE-----,打开client.key.pem看到只有一段friendlyName: localhost,那么找到client.cer.pem(上图为中间一段),删除其余部分,剩余如下:
-----BEGIN CERTIFICATE-----
MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj
bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE
CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y
ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE
BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2
OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符
-----END CERTIFICATE-----
- 测试程序如下:
package main
// Run with:
// go build examples/base-client/*.go
// ./base-client
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"os"
"os/signal"
"sync"
"github.com/Shopify/sarama"
)
func main() {
tlsConfig, err := NewTLSConfig("bundle/client.cer.pem",
"bundle/client.key.pem",
"bundle/server.cer.pem")
if err != nil {
log.Fatal(err)
}
// This can be used on test server if domain does not match cert:
tlsConfig.InsecureSkipVerify = true
consumerConfig := sarama.NewConfig()
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
client, err := sarama.NewClient([]string{"192.168.2.31:9093"}, consumerConfig)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumerLoop(consumer, "Test")
}
// NewTLSConfig generates a TLS configuration used to authenticate on server with
// certificates.
// Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// Load client cert
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// Load CA cert
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
func consumerLoop(consumer sarama.Consumer, topic string) {
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Println("unable to fetch partition IDs for the topic", topic, err)
return
}
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
for partition := range partitions {
wg.Add(1)
go func() {
consumePartition(consumer, int32(partition), signals)
wg.Done()
}()
}
wg.Wait()
}
func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) {
log.Println("Receving on partition", partition)
partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Println(err)
}
}()
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
按照网上的教程出现各种错误,主要在于client.cer.pem需要手动修改
参考地址:
https://medium.com/processone/using-tls-authentication-for-your-go-kafka-client-3c5841f2a625
https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go