SpringBoot3.0 + RocketMq 构建企业级数据中台

biancheng1 · · 746 次点击 · · 开始浏览    

### download:[SpringBoot3.0 + RocketMq 构建企业级数据中台](https://www.zxit666.com/6588/) SpringBoot整合RocketMQ(商业云端版)——发送普通音讯 首先去阿里云控制台获取相关的资源,如topic、groupId、以及鉴权需求的AccessKet. 在springboot项目的pom.xml中添加以来 com.aliyun.openservices ons-client 1.8.4.Final 配置application配置文件 #RocketMQ 将xxxxx交换为你的资源 rocketmq.accessKey=xxxxx rocketmq.secretKey=xxxxx rocketmq.nameSrvAddr=http://xxxxx:8080 rocketmq.topic=xxxxx rocketmq.groupId=xxxxx rocketmq.tag=* 封装MQ配置类 @Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } } 封装 MQ发送配置 @Component @RefreshScope public class TopicProperites { @Value("${rocketmq.groupId}") private String groupId; @Value("${rocketmq.topic}") private String topic; @Value("${rocketmq.tag}") private String tag; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } @Override public String toString() { return "TopicProperites{" + "groupId='" + groupId + '\'' + ", topic='" + topic + '\'' + ", tag='" + tag + '\'' + '}'; } } 给音讯消费者注入配置信息,ProducerBean用于将Producer集成至SpringBoot中 @Configuration @RefreshScope public class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } } 音讯消费者Producer @Component public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); @Autowired private ProducerBean producer; @Autowired private TopicProperites topicProperites; public SendResult send (String body) { Message msg = new Message( // Message所属的Topic topicProperites.getTopic(), // Message Tag 可了解为Gmail中的标签,抵消息停止再归类,便当Consumer指定过滤条件在MQ效劳器过滤 opicProperites.getTag(), // Message Body 能够是任何二进制方式的数据, MQ不做任何干预 // 需求Producer与Consumer协商好分歧的序列化和反序列化方式 body.getBytes()); // 设置代表音讯的业务关键属性,请尽可能全局独一 // 以便当您在无法正常收到音讯状况下,可经过MQ 控制台查询音讯并补发 // 留意:不设置也不会影响音讯正常收发 msg.setKey(null); LOGGER.info("准备发送音讯为:{}", body); // 发送音讯,只需不抛异常就是胜利 try { SendResult sendResult = producer.send(msg); if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; } else { error(msg, new Exception()); return null; } } catch (ONSClientException e) { error(msg, e); //呈现异常意味着发送失败,为了防止音讯丧失,倡议缓存该音讯然后停止重试。 } } private void error(Message msg, Exception e) { LOGGER.error("发送MQ音讯失败 -- Topic:{},Key:{},tag:{},body:{}" , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody())); LOGGER.error("Exception -- : ", e); } private void success(Message msg, String messageId) { LOGGER.info("发送MQ音讯胜利 -- Topic:{},msgId:{},Key:{},tag:{},body:{}" , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody())); } } 音讯消费者配置 @Configuration @RefreshScope public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private TopicProperites topicProperites; @Autowired private MqMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, topicProperites.getGroupId()); //将消费者线程数固定为20个 20为默许值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); //重试次数 -1代表重试16次 properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1"); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); Subscription subscription = new Subscription(); subscription.setTopic(topicProperites.getTopic()); subscription.setExpression(topicProperites.getTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } } 音讯消费者 @Component public class MqMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageListener.class); @Override public Action consume(Message message, ConsumeContext consumeContext) { try { //消费音讯 String msgTag = message.getTag(); String msgBody = new String(message.getBody(), StandardCharsets.UTF_8); LOGGER.info("接纳到MQ音讯 -- Topic:{},tag:{},msgId:{},Key:{},body:{}", message.getTopic(), msgTag, message.getMsgID(), message.getKey(), new String(message.getBody())); //获取音讯重试次数 int reconsumeTimes = message.getReconsumeTimes(); LOGGER.info("获取音讯重试次数为:" + reconsumeTimes); //消费胜利,继续消费下一条音讯 return Action.CommitMessage; } catch (Exception e) { LOGGER.error("消费MQ音讯失败! msgId:" + message.getMsgID() + " ---Exception:" + e); //消费失败,告知效劳器稍后再投递这条音讯,继续消费其他音讯 return Action.ReconsumeLater; } } } ————————————————

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

746 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传