Teleport with ClickHouse

霍霍霍霍晨 · · 1774 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

Base on Teleport 2.2.0

在之前的文章中我们提到过如果通过Hangout将Kafka中的数据接入ClickHouse中,相关文章。Hangout固然是一个很好的工具,能够快速的实现将Kafka里的数据经过处理写入ClickHouse。但是在以下场景下Hangout无法很好满足我们的需求

数据的QPS大或者单条数据长度特别大

这两种场景下很容易达到hangout-output-clickhouse插件的瓶颈,因为说到底,hangout-output-clickhosue插件底层使用的是clickhouse-jdbc,而clickhouse-jdbc是通过http请求发送数据,hangout-output-clickhouse最先达到的瓶颈其实就是http接口发送数据的瓶颈。

在这种背景下,我们尝试通过TCP协议来完成数据的写入。于是我们通过golang完成了Teleport,通过Teleport将数据从Kafka中导入ClickHouse

Teleport

Teleport是由golang编写的专门用于从Kafka中拉取数据进行处理最后写入ClickHouse中的ETL工具。其中写入ClickHouse的部分是通过clickhouse实现的。

接下来还是以Nginx日志为例说明如何使用Teleport。

Prerequisites

  1. 下载Teleport并给予权限
wget https://github.com/RickyHuo/teleport/releases/download/v2.2.0/teleport

chomd 777 teleport
  1. 安装Kafka依赖(librdkafka)

librdkafka.x86_64 0.11.4-1.el7

yum install librdkafka

Configuration Example: Nginx Log

Log Sample

001.cms.msina..sinanode.com[27/Dec/2017:16:01:03 +0800]-”GET /n/front/w636h3606893220.jpg/w720q75apl.webp HTTP/1.1””SinaNews/201706071542.1 CFNetwork/758.1.6 Darwin/15.0.0”200[127.0.0.1]-”-“0.02110640-127.0.0.1l.sinaimg.cn-

Kafka

如下所示, 指定kafka数据源

kafka:
    kafka.bootstrap.servers: "localhost:9092"
    kafka.group.id: "teleport_nginx_sample"
    kafka.auto.offset.reset: "earliest"
    topic: "teleport_nginx"

Input

指定输入数据处理方式,目前支持JSON和Grok

input:
    format: "line"
    match: "%{NOTSPACE:_hostname}`\[%{HTTPDATE:timestamp}\]`%{NOTSPACE:upstream}`\"%{NOTSPACE:_method}\s%{NOTSPACE:_uri}\s%{NOTSPACE:httpversion}\"`%{QS:_ua}`%{NUMBER:_http_code}`\[%{IP:_remote_addr}\]`%{NOTSPACE:unknow1}`%{QS:_reference}`%{NUMBER:_request_time}`%{NUMBER:_data_size}`%{NOTSPACE:unknow3}`%{IP:_http_x_forwarded_for}`%{NOTSPACE:_domain}`%{DATA:unknow4}$"

Filter

在Filter部分,这里有一系列转化的步骤,包括时间转换、类型转换等

filter:
    - date:
        source_field: "timestamp"
        format: "02/Jan/2006:15:04:05 -0700"
    - add:
        idc_ip: "{{Split ._hostname \".\" |Slice 0 2 | Join \".\"}}"

Output

最后我们将处理好的结构化数据写入ClickHouse

output:
    - clickhouse:
        table: "cms_msg"
        host: "localhost:9000"
        clickhouse.read_timeout: 10
        clickhouse.write_timeout: 20
        clickhouse.debug: "false"
        clickhouse.compress: "true"
        clickhouse.database: "cms"
        fields: ['date', 'datetime','hour', '_hostname', '_domain', '_data_size', '_uri', '_request_time', '_ua', '_http_code', '_remote_addr', '_method', '_reference', '_url', 'idc_ip']
        bulk_size: 30000

ClickHouse Schema

当然, ClickHouse存储这些数据的前提是我们已经建立好了这些数据表。具体建表操作如下:

CREATE TABLE cms.cms_msg
(
    date Date, 
    datetime DateTime, 
    hour Int8, 
    _uri String, 
    _url String, 
    _request_time Float32, 
    _http_code String, 
    _hostname String, 
    _domain String, 
    _http_x_forwarded_for String, 
    _remote_addr String, 
    _reference String, 
    _data_size Int32, 
    _method String, 
    _rs String, 
    _rs_time Float32, 
    _ua String,
    idc_ip String
) ENGINE = MergeTree(date, (hour, date), 8192)


CREATE TABLE cms.cms_msg_all
(
    date Date, 
    datetime DateTime, 
    hour Int8, 
    _uri String, 
    _url String, 
    _request_time Float32, 
    _http_code String, 
    _hostname String, 
    _domain String, 
    _http_x_forwarded_for String, 
    _remote_addr String, 
    _reference String, 
    _data_size Int32, 
    _method String, 
    _ua String,
    idc_ip String
) ENGINE = Distributed(bip_ck_cluster, 'cms', 'cms_msg', rand())

Conclusion

在这篇文章中,我们介绍了如何使用Teleport将Nginx日志文件写入ClickHouse中,整个配置与Hangout一样灵活,入手成本不高。目前Teleport支持的插件较少,后期会逐步完善,有任何问题都可以在Teleport中提ISSUE


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:霍霍霍霍晨

查看原文:Teleport with ClickHouse

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1774 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传