grpc kafka redis 杂记

aside section ._1OhGeD · · 632 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

0x00 关于

最近一个项目使用的技术栈有点乱,也爬了不少坑, 记录备忘一下.
处理流程:

  • 在springboot项目中监听kafka数据
  • 在springboot中, 使用grpc调用golang的grpc服务程序,作格式转换
  • 把处理结果合并后, 推送给远程redis
0x01 关于grpc代码的生成

经过实践, 发现网上提供的pom.xml插入大量代码,使用插件来生成proto代码的方式并不优雅.
笔者认为, 采用命令行的方式来生成代码更为简洁灵活高效.
(这里就不细说插件方式生成的代码 是在classes中,而不是放到src下, 更不想说, proto内容更新后, mvn compile各种失败带来的痛苦)

cd prj\src\main
protoc.exe -I=proto --grpc-java_out=.\java proto\lua.proto
protoc.exe -I=proto --java_out=.\java proto\lua.proto
  • pom.xml java grpc 依赖的库:
    <properties>
        <grpc.version>1.23.0</grpc.version>
        <protobuf.version>3.9.0</protobuf.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>${grpc.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
    </dependencies>

0x02 如何让Kafka每次接入时消费最新数据

给项目配置了kafka参数 :auto-offset-reset: latest
但是, 项目重启后, 还是会从上一次消费结束的地方接着消息.

如何解决这个问题呢? 方法如下:
@KafkaListener 所在类, 继承一下ConsumerSeekAware

重点是现在的 seekToEnd 方法

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }
0x03 增强Kafka消费能力

在实测过程中, 发现消费能力有问题, 导致kafka的消费队列lag很大.
开始以为是向远程 redis单次写数据耗时太大, 从而优化成leftPushAll方法.
但问题依旧, 后来经同事提醒, 可能是带宽问题.

果然, 提一下带宽,问题立即解决.


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:aside section ._1OhGeD

查看原文:grpc kafka redis 杂记

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

632 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传