创建kafka并使用

yellowone · · 2218 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

kafka系列分为两个篇幅,分别是实用篇,讲使用命令和一些使用中会遇到的概念名词,理论篇,讲kafka为了实现高可用和高性能做了哪些努力。这篇我们从搭建开始,然后用kafka脚本去发送和接受信息,最后用go语言展示在代码之中怎么使用。

大家可以在kafka官网上面下载最新包。要是嫌弃网速太慢的话可以用一下我这个包,我下载了传到了百度云,提取码是:klei。

使用的系统是linux,要是没有服务器,我特别推荐windows10的linux子系统,在上面也可以运行,好用又舒服,这里我用了一个测试虚拟机。

启动kafka服务

下载好了压缩包之后,从本地scp到服务器上

root@DESKTOP-888:/mnt/e/BaiduNetdiskDownload#
scp -C -i /root/curt/id_rsa_zelin.huang -P 33335 kafka_2.11-1.0.0.tgz zelin.huang@dhzltest01.***:/tmp

然后登陆到服务器上,把/tmp/kafka_2.11-1.0.0.tgz 弄到你想存放的目录上,然后

[zelin.huang@dhzltest01.*** app]$ ls
cron   kafka_2.11-1.0.0.tgz  
[zelin.huang@dhzltest01.*** app]$ sudo tar -xzf kafka_2.11-1.0.0.tgz
[zelin.huang@dhzltest01.*** app]$ ls
cron   kafka_2.11-1.0.0  kafka_2.11-1.0.0.tgz  
[zelin.huang@dhzltest01.*** app]$ ls kafka_2.11-1.0.0
bin  config  libs  LICENSE  NOTICE  site-docs

首先kafka的启动是需要ZooKeeper来托管的,至于为什么需要,理论篇我们再提一下,现在要是自己有机子起了ZooKeeper服务的话,可以跳过下面这一步。

[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties
/home/app/kafka_2.11-1.0.0/bin/kafka-run-class.sh: line 270: exec: java: not found

但是我在运行的时候发现这台虚拟机竟然没有java环境(公司后台语言是golang+php),没办法,只能装java。

[zelin.huang@dhzltest01.*** ~]$ sudo yum install java-1.8.0-openjdk
.......(安装输出)
[zelin.huang@dhzltest01.*** ~]$ java -version                      
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b07)
OpenJDK 64-Bit Server VM (build 25.242-b07, mixed mode)

安装好了再来,我们可以在linux中挂载运行zookeeper,这样当我们只是暂时练下手而不是真正使用还是很好的,这样我们退出shell之后,我们启动的服务也会关闭,不会占用到系统资源(要是后台运行想关了,请用ps+kill)。如果真的是想用在生产或者测试环境,而不是顺便玩玩的话,虚拟机可以托管在
supervisor或者是以nohub模式运行。

这里我们起多个终端,可以更好地看到各个工具的输出。

[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties 
[2020-02-29 11:57:10,691] INFO Reading configuration from: config/zookeeper.properties 
。。。
[2020-02-29 11:57:10,739] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
......

需要注意的是zookeeper占用的端口号是2181
然后起另一个终端

[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-server-start.sh config/server.properties
[2020-02-29 11:59:36,346] INFO KafkaConfig values: 
。。。
        port = 9092
。。。
[2020-02-29 11:59:36,451] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
。。。

其中可以看到kafka连接的zookeeper是上面所启动的2181端口号,所以kafka是依赖zookeeper启动的,如果我们要启动多个kafka形成一个集群,那么我们设定的连接zookeeper的服务是同一个。
kafka占用的端口号是,9092。
好,执行到这一步,我们的kafka是启动起来了。
接下来,我们使用kafka来实现一个消息队列的功能。
首先该创建一个topic,topic相当于kafka的一个消息类型,通过选择不同的topic发送,或者是监听某个topic,就可以实现消息队列。发消息的时候是需要指定topic的。
或者,您也可将topic配置为:发消息指定的topic不存在时,自动创建topic,而不是手动创建。(ps:我们公司的测试环境是不需要创建topic的,但是正式环境需要,所以曾经导致测试环境跑得好好的代码,到了正式环境就不行了)

[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo  bin/kafka-topics.sh --list --zookeeper localhost:2181 
test

创建消费者和生产者

这里创建了一个topic和查看所有的topic。
然后我们创建生产者和消费者,尝试发送一些消息。
一个终端做消费者,一个终端做生产者

#生产者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>huangzelin
>huangzliyong
>
# 消费者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
huangzelin
huangzliyong

这里我们可以看到可以正常的通讯信息。

在go语言中使用

go创建生产者

package easy_kafka

import (
    "fmt"
    "github.com/pkg/errors"
    "gopkg.in/Shopify/sarama.v1"
)

type KafkaProducer struct {
    producer sarama.SyncProducer
}

func (self *KafkaProducer) Init(ip string,port int) error {
    //这里可以初始化多个kafka的,因为是集群,最好多传几个,但是只传一个也可以使用
    servers := []string{fmt.Sprintf("%s:%d", ip, port)}
    p, err := sarama.NewSyncProducer(servers, sarama.NewConfig())
    if err != nil {
        return err
    }
    self.producer = p
    return nil
}

func (self *KafkaProducer) SendMessage(topic string, data []byte) error {
    if self.producer == nil {
        return errors.New("no producer while send message")
    }
    kafkaMsg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   nil,
        Value: sarama.ByteEncoder(data),
    }
    _, _, err := self.producer.SendMessage(kafkaMsg)
    return err
}

func (self *KafkaProducer) Close() error {
    if self.producer != nil {
        return self.producer.Close()
    }
    return nil
}

go启动一个监听卡夫卡对象

package easy_kafka

import (
    "fmt"
    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster"
    "github.com/golang/glog"
    "github.com/pkg/errors"
)

type KafkaConsumer struct {
    consumer *cluster.Consumer
}

func (self *KafkaConsumer) Init(zookeeperIp string, zookeeperPort int, topic []string, group string) error {
    zookeepeServers := []string{fmt.Sprintf("%s:%d", zookeeperIp, zookeeperPort)}
    config := cluster.NewConfig()
    //配置是否接受错误信息
    config.Consumer.Return.Errors = true
    //配置是否接受注意消息
    config.Group.Return.Notifications = true
    //配置是否接受最新消息
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    //这个消费者是谁,同一个消费者如果对一条信息确认了,则不会重复发送
    config.ClientID = group
    //topic是指要收到的消息对象
    cg, err := cluster.NewConsumer(zookeepeServers, group, topic, config)
    if err != nil {
        return err
    }
    self.consumer = cg
    return nil
}

//注意该方法是非阻塞的,如果调用了该方法,并且没有其他的阻塞方法,记得手动阻塞他
func (self *KafkaConsumer) StartKafkaListen(listenMsg func(*sarama.ConsumerMessage)) error {
    if self.consumer == nil {
        return errors.New("还没初始化消费者对象")
    }
    go func(cg *cluster.Consumer) {
        for message := range cg.Messages() {
            go listenMsg(message)
            //确认这条消息收到
            cg.MarkOffset(message, "")
        }
    }(self.consumer)
    go func(cg *cluster.Consumer) {
        for ntf := range cg.Notifications() {
            glog.Infof("%+v", *ntf)
        }
    }(self.consumer)
    go func(cg *cluster.Consumer) {
        for err := range cg.Errors() {
            glog.Errorf("%+v", err)
        }
    }(self.consumer)
    return nil
}

小结

kafka的安装包十分友好,启动服务过程相当简单,但是可配置内容还是很多的,不过简单使用直接默认的配置文件去启动过程就可以啦。

利用

启动服务
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties

创建topic
sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

启动消费者脚本和消费者脚本
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

更多操作命令可以去(kafka中文文档官网)查看
还有用go语言展示了在写代码的时候怎么使用kafka,可以直接拿去用的没问题。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:yellowone

查看原文:创建kafka并使用

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2218 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传