0x00 关于
最近一个项目使用的技术栈有点乱,也爬了不少坑, 记录备忘一下.
处理流程:
- 在springboot项目中监听kafka数据
- 在springboot中, 使用grpc调用golang的grpc服务程序,作格式转换
- 把处理结果合并后, 推送给远程redis
0x01 关于grpc代码的生成
经过实践, 发现网上提供的pom.xml
插入大量代码,使用插件来生成proto
代码的方式并不优雅.
笔者认为, 采用命令行的方式来生成代码更为简洁灵活高效.
(这里就不细说插件方式生成的代码 是在classes中,而不是放到src下, 更不想说, proto内容更新后, mvn compile各种失败带来的痛苦)
- 关于java grpc代码生成
准备两个执行程序:-
protoc
: https://github.com/protocolbuffers/protobuf/releases
按平台, 下载平台对应的应用, 并放到PATH路径下 -
protoc-gen-grpc-java
:https://repo1.maven.org/maven2/io/grpc/protoc-gen-grpc-java/1.24.0/
下载平台对应执行文件, 并修改名称为protoc-gen-grpc-java.exe(windows)
代码生成命令
-
cd prj\src\main
protoc.exe -I=proto --grpc-java_out=.\java proto\lua.proto
protoc.exe -I=proto --java_out=.\java proto\lua.proto
-
pom.xml
java grpc 依赖的库:
<properties>
<grpc.version>1.23.0</grpc.version>
<protobuf.version>3.9.0</protobuf.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
0x02 如何让Kafka每次接入时消费最新数据
给项目配置了kafka参数 :auto-offset-reset: latest
但是, 项目重启后, 还是会从上一次消费结束的地方接着消息.
如何解决这个问题呢? 方法如下:
给 @KafkaListener
所在类, 继承一下ConsumerSeekAware
重点是现在的 seekToEnd
方法
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
0x03 增强Kafka消费能力
在实测过程中, 发现消费能力有问题, 导致kafka的消费队列lag很大.
开始以为是向远程 redis单次写数据耗时太大, 从而优化成leftPushAll
方法.
但问题依旧, 后来经同事提醒, 可能是带宽问题.
果然, 提一下带宽,问题立即解决.
有疑问加站长微信联系(非本文作者)