当Elasticsearch遇见Kafka

qcloudcommunity · · 2030 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

**欢迎大家前往[腾讯云+社区](https://cloud.tencent.com/developer/?fromSource=waitui),获取更多腾讯海量技术实践干货哦~** > 本文由[michelmu](https://cloud.tencent.com/developer/user/1330019)发表于[云+社区专栏](https://cloud.tencent.com/developer/column/4008?fromSource=waitui) Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash, Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是Logstash支持的众多输入输出源的其中一个。本文将从实践的角度,研究使用Logstash Kafka Input插件实现将Kafka中数据导入到Elasticsearch的过程。 ![img](https://main.qcloudimg.com/raw/2ea2256d44409a657557fcaaa62b8288.png)使用Logstash Kafka插件连接Kafka和Elasticsearch ### 1 Logstash Kafka input插件简介 Logstash Kafka Input插件使用Kafka API从Kafka topic中读取数据信息,使用时需要注意Kafka的版本及对应的插件版本是否一致。该插件支持通过SSL和Kerveros SASL方式连接Kafka。另外该插件提供了group管理,并使用默认的offset管理策略来操作Kafka topic。 Logstash默认情况下会使用一个单独的group来订阅Kafka消息,每个Logstash Kafka Consumer会使用多个线程来增加吞吐量。当然也可以多个Logstash实例使用同一个group_id,来均衡负载。另外建议把Consumer的个数设置为Kafka分区的大小,以提供更好的性能。 ### 2 测试环境准备 #### 2.1 创建Elasticsearch集群 为了简化搭建过程,本文使用了[腾讯云Elasticsearch service](https://cloud.tencent.com/product/es)。腾讯云Elasticsearch service不仅可以实现Elasticsearch集群的快速搭建,还提供了内置Kibana,集群监控,专用主节点,Ik分词插件等功能,极大的简化了Elasticsearch集群的创建和管理工作。 #### 2.2 创建Kafka服务 Kafka服务的搭建采用[腾讯云CKafka](https://cloud.tencent.com/product/CKafka)来完成。与Elasticsearch Service一样,腾讯云CKafka可以实现Kafka服务的快速创建,100%兼容开源Kafka API(0.9版本)。 #### 2.3 服务器 除了准备Elasticsearch和Kafka,另外还需要准备一台服务器,用于运行Logstash以连接Elasticsearch和Kafka。本文采用[腾讯云CVM](https://cloud.tencent.com/product/cvm)服务器 #### 2.4 注意事项 1) 需要将Elasticsearch、Kafka和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将Elasticsearch service,CKafka和CVM创建在同一个私有网路(VPC)下即可。 2) 注意获取Elasticsearch serivce,CKafka和CVM的内网地址和端口,以便后续服务使用 本次测试中: | 服务 | ip | port | | --------------------- | ------------- | ---- | | Elasticsearch service | 192.168.0.8 | 9200 | | Ckafka | 192.168.13.10 | 9092 | | CVM | 192.168.0.13 | - | ### 3 使用Logstash连接Elasticsearch和Kafka #### 3.1 Kafka准备 可以参考[[CKafka 使用入门](https://cloud.tencent.com/document/product/597/10112)] 按照上面的教程 1) 创建名为**kafka_es_test**的topic 2) 安装JDK 3) 安装Kafka工具包 4) 创建producer和consumer验证kafka功能 #### 3.2 安装Logstash Logstash的安装和使用可以参考[[一文快速上手Logstash](https://cloud.tencent.com/developer/article/1353068)] #### 3.3 配置Logstash Kafka input插件 创建kafka_test_pipeline.conf文件内容如下: ```js input{ kafka{ bootstrap_servers=>"192.168.13.10:9092" topics=>["kafka_es_test"] group_id=>"logstash_kafka_test" } } output{ elasticsearch{ hosts=>["192.168.0.8:9200"] } } ``` 其中定义了一个kafka的input和一个elasticsearch的output 对于Kafka input插件上述三个参数为必填参数,除此之外还有一些对插件行为进行调整的一些参数如: **auto_commit_interval_ms** 用于设置Consumer提交offset给Kafka的时间间隔 **consumer_threads** 用于设置Consumer的线程数,默认为1,实际中应设置与Kafka Topic分区数一致 **fetch_max_wait_ms** 用于指定Consumer等待一个fetch请求达到fetch_min_bytes的最长时间 **fetch_min_bytes** 用于指定Consumer fetch请求应返回的最小数据量 **topics_pattern** 用于通过正则订阅符合某一规则的一组topic 更多参数参考:[[Kafka Input Configuration Options](https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-options)] #### 3.4 启动Logstash 以下操作在Logstash根目录中进行 1) 验证配置 ```js ./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit ``` 如有错误,根据提示修改配置文件。若配置正确会得到如下结果 ```js Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties [2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"} [2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"} Configuration OK [2018-11-11T15:24:01,746][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash ``` 2) 启动Logstash ```js ./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic ``` 观察日志是否有错误提示,并及时处理 #### 3.4 启动Kafka Producer 以下操作在Kafka工具包根目录下进行 ```js ./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test ``` 写入测试数据 ```js This is a message ``` #### 3.5 Kibana验证结果 登录Elasticsearch对应Kibana, 在Dev Tools中进行如下操作 1) 查看索引 ```js GET _cat/indices ``` 可以看到一个名为logstash-xxx.xx.xx的索引被创建成功 ```js green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb ``` 2) 查看写入的数据 ```js GET logstash-2018.11.11/_search ``` 可以看到数据已经被成功写入 ```js { "took": 0, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 1, "hits": [ { "_index": "logstash-2018.11.11", "_type": "logs", "_id": "AWcBsEegMu-Dkjm1ap3H", "_score": 1, "_source": { "message": "This is a message", "@version": "1", "@timestamp": "2018-11-11T07:33:09.079Z" } } ] } } ``` ### 4 总结 Logstash作为Elastic Stack中数据采集和处理的核心组件,为Elasticsearch提供了强大的数据源兼容能力。从测试过程可以看出,使用Logstash实现kafka和Elaticsearch的连接过程相当简单方便。另外Logstash的数据处理功能,也使得采用该架构的系统对数据映射和处理有天然的优势。 然而,使用Logstash实现Kafka和Elasticsearch的连接,并不是连接Kafka和Elasticsearch的唯一方案,另一种常见的方案是使用Kafka Connect, 可以参考“[当Elasticsearch遇见Kafka--Kafka Connect](https://cloud.tencent.com/developer/article/1362324)” >**相关阅读** >[【每日课程推荐】机器学习实战!快速入门在线广告业务及CTR相应知识](https://cloud.tencent.com/developer/edu/course-1128?fromSource=waitui) **此文已由作者授权腾讯云+社区发布,更多原文请[点击](https://cloud.tencent.com/developer/article/1362320?fromSource=waitui )** **搜索关注公众号「云加社区」,第一时间获取技术干货,关注后回复1024 送你一份技术课程大礼包!** 海量技术实践经验,尽在[云加社区](https://cloud.tencent.com/developer?fromSource=waitui)!

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2030 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传