kafka系列分为两个篇幅,分别是实用篇,讲使用命令和一些使用中会遇到的概念名词,理论篇,讲kafka为了实现高可用和高性能做了哪些努力。这篇我们从搭建开始,然后用kafka脚本去发送和接受信息,最后用go语言展示在代码之中怎么使用。
大家可以在kafka官网上面下载最新包。要是嫌弃网速太慢的话可以用一下我这个包,我下载了传到了百度云,提取码是:klei。
使用的系统是linux,要是没有服务器,我特别推荐windows10的linux子系统,在上面也可以运行,好用又舒服,这里我用了一个测试虚拟机。
启动kafka服务
下载好了压缩包之后,从本地scp到服务器上
root@DESKTOP-888:/mnt/e/BaiduNetdiskDownload#
scp -C -i /root/curt/id_rsa_zelin.huang -P 33335 kafka_2.11-1.0.0.tgz zelin.huang@dhzltest01.***:/tmp
然后登陆到服务器上,把/tmp/kafka_2.11-1.0.0.tgz 弄到你想存放的目录上,然后
[zelin.huang@dhzltest01.*** app]$ ls
cron kafka_2.11-1.0.0.tgz
[zelin.huang@dhzltest01.*** app]$ sudo tar -xzf kafka_2.11-1.0.0.tgz
[zelin.huang@dhzltest01.*** app]$ ls
cron kafka_2.11-1.0.0 kafka_2.11-1.0.0.tgz
[zelin.huang@dhzltest01.*** app]$ ls kafka_2.11-1.0.0
bin config libs LICENSE NOTICE site-docs
首先kafka的启动是需要ZooKeeper来托管的,至于为什么需要,理论篇我们再提一下,现在要是自己有机子起了ZooKeeper服务的话,可以跳过下面这一步。
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties
/home/app/kafka_2.11-1.0.0/bin/kafka-run-class.sh: line 270: exec: java: not found
但是我在运行的时候发现这台虚拟机竟然没有java环境(公司后台语言是golang+php),没办法,只能装java。
[zelin.huang@dhzltest01.*** ~]$ sudo yum install java-1.8.0-openjdk
.......(安装输出)
[zelin.huang@dhzltest01.*** ~]$ java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b07)
OpenJDK 64-Bit Server VM (build 25.242-b07, mixed mode)
安装好了再来,我们可以在linux中挂载运行zookeeper,这样当我们只是暂时练下手而不是真正使用还是很好的,这样我们退出shell之后,我们启动的服务也会关闭,不会占用到系统资源(要是后台运行想关了,请用ps+kill)。如果真的是想用在生产或者测试环境,而不是顺便玩玩的话,虚拟机可以托管在
supervisor或者是以nohub模式运行。
这里我们起多个终端,可以更好地看到各个工具的输出。
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-02-29 11:57:10,691] INFO Reading configuration from: config/zookeeper.properties
。。。
[2020-02-29 11:57:10,739] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
......
需要注意的是zookeeper占用的端口号是2181
然后起另一个终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-server-start.sh config/server.properties
[2020-02-29 11:59:36,346] INFO KafkaConfig values:
。。。
port = 9092
。。。
[2020-02-29 11:59:36,451] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
。。。
其中可以看到kafka连接的zookeeper是上面所启动的2181端口号,所以kafka是依赖zookeeper启动的,如果我们要启动多个kafka形成一个集群,那么我们设定的连接zookeeper的服务是同一个。
kafka占用的端口号是,9092。
好,执行到这一步,我们的kafka是启动起来了。
接下来,我们使用kafka来实现一个消息队列的功能。
首先该创建一个topic,topic相当于kafka的一个消息类型,通过选择不同的topic发送,或者是监听某个topic,就可以实现消息队列。发消息的时候是需要指定topic的。
或者,您也可将topic配置为:发消息指定的topic不存在时,自动创建topic,而不是手动创建。(ps:我们公司的测试环境是不需要创建topic的,但是正式环境需要,所以曾经导致测试环境跑得好好的代码,到了正式环境就不行了)
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-topics.sh --list --zookeeper localhost:2181
test
创建消费者和生产者
这里创建了一个topic和查看所有的topic。
然后我们创建生产者和消费者,尝试发送一些消息。
一个终端做消费者,一个终端做生产者
#生产者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>huangzelin
>huangzliyong
>
# 消费者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
huangzelin
huangzliyong
这里我们可以看到可以正常的通讯信息。
在go语言中使用
go创建生产者
package easy_kafka
import (
"fmt"
"github.com/pkg/errors"
"gopkg.in/Shopify/sarama.v1"
)
type KafkaProducer struct {
producer sarama.SyncProducer
}
func (self *KafkaProducer) Init(ip string,port int) error {
//这里可以初始化多个kafka的,因为是集群,最好多传几个,但是只传一个也可以使用
servers := []string{fmt.Sprintf("%s:%d", ip, port)}
p, err := sarama.NewSyncProducer(servers, sarama.NewConfig())
if err != nil {
return err
}
self.producer = p
return nil
}
func (self *KafkaProducer) SendMessage(topic string, data []byte) error {
if self.producer == nil {
return errors.New("no producer while send message")
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.ByteEncoder(data),
}
_, _, err := self.producer.SendMessage(kafkaMsg)
return err
}
func (self *KafkaProducer) Close() error {
if self.producer != nil {
return self.producer.Close()
}
return nil
}
go启动一个监听卡夫卡对象
package easy_kafka
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/golang/glog"
"github.com/pkg/errors"
)
type KafkaConsumer struct {
consumer *cluster.Consumer
}
func (self *KafkaConsumer) Init(zookeeperIp string, zookeeperPort int, topic []string, group string) error {
zookeepeServers := []string{fmt.Sprintf("%s:%d", zookeeperIp, zookeeperPort)}
config := cluster.NewConfig()
//配置是否接受错误信息
config.Consumer.Return.Errors = true
//配置是否接受注意消息
config.Group.Return.Notifications = true
//配置是否接受最新消息
config.Consumer.Offsets.Initial = sarama.OffsetNewest
//这个消费者是谁,同一个消费者如果对一条信息确认了,则不会重复发送
config.ClientID = group
//topic是指要收到的消息对象
cg, err := cluster.NewConsumer(zookeepeServers, group, topic, config)
if err != nil {
return err
}
self.consumer = cg
return nil
}
//注意该方法是非阻塞的,如果调用了该方法,并且没有其他的阻塞方法,记得手动阻塞他
func (self *KafkaConsumer) StartKafkaListen(listenMsg func(*sarama.ConsumerMessage)) error {
if self.consumer == nil {
return errors.New("还没初始化消费者对象")
}
go func(cg *cluster.Consumer) {
for message := range cg.Messages() {
go listenMsg(message)
//确认这条消息收到
cg.MarkOffset(message, "")
}
}(self.consumer)
go func(cg *cluster.Consumer) {
for ntf := range cg.Notifications() {
glog.Infof("%+v", *ntf)
}
}(self.consumer)
go func(cg *cluster.Consumer) {
for err := range cg.Errors() {
glog.Errorf("%+v", err)
}
}(self.consumer)
return nil
}
小结
kafka的安装包十分友好,启动服务过程相当简单,但是可配置内容还是很多的,不过简单使用直接默认的配置文件去启动过程就可以啦。
利用
启动服务
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties
创建topic
sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
启动消费者脚本和消费者脚本
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
更多操作命令可以去(kafka中文文档官网)查看
还有用go语言展示了在写代码的时候怎么使用kafka,可以直接拿去用的没问题。
有疑问加站长微信联系(非本文作者)