### download:[SpringBoot3.0 + RocketMq 构建企业级数据中台](https://www.zxit666.com/6588/)
SpringBoot整合RocketMQ(商业云端版)——发送普通音讯
首先去阿里云控制台获取相关的资源,如topic、groupId、以及鉴权需求的AccessKet.
在springboot项目的pom.xml中添加以来
com.aliyun.openservices
ons-client
1.8.4.Final
配置application配置文件
#RocketMQ 将xxxxx交换为你的资源
rocketmq.accessKey=xxxxx
rocketmq.secretKey=xxxxx
rocketmq.nameSrvAddr=http://xxxxx:8080
rocketmq.topic=xxxxx
rocketmq.groupId=xxxxx
rocketmq.tag=*
封装MQ配置类
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
}
封装 MQ发送配置
@Component
@RefreshScope
public class TopicProperites {
@Value("${rocketmq.groupId}")
private String groupId;
@Value("${rocketmq.topic}")
private String topic;
@Value("${rocketmq.tag}")
private String tag;
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
@Override
public String toString() {
return "TopicProperites{" +
"groupId='" + groupId + '\'' +
", topic='" + topic + '\'' +
", tag='" + tag + '\'' +
'}';
}
}
给音讯消费者注入配置信息,ProducerBean用于将Producer集成至SpringBoot中
@Configuration
@RefreshScope
public class ProducerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
音讯消费者Producer
@Component
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private ProducerBean producer;
@Autowired
private TopicProperites topicProperites;
public SendResult send (String body) {
Message msg = new Message(
// Message所属的Topic
topicProperites.getTopic(),
// Message Tag 可了解为Gmail中的标签,抵消息停止再归类,便当Consumer指定过滤条件在MQ效劳器过滤
opicProperites.getTag(),
// Message Body 能够是任何二进制方式的数据, MQ不做任何干预
// 需求Producer与Consumer协商好分歧的序列化和反序列化方式
body.getBytes());
// 设置代表音讯的业务关键属性,请尽可能全局独一
// 以便当您在无法正常收到音讯状况下,可经过MQ 控制台查询音讯并补发
// 留意:不设置也不会影响音讯正常收发
msg.setKey(null);
LOGGER.info("准备发送音讯为:{}", body);
// 发送音讯,只需不抛异常就是胜利
try {
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
success(msg, sendResult.getMessageId());
return sendResult;
} else {
error(msg, new Exception());
return null;
}
} catch (ONSClientException e) {
error(msg, e);
//呈现异常意味着发送失败,为了防止音讯丧失,倡议缓存该音讯然后停止重试。
}
}
private void error(Message msg, Exception e) {
LOGGER.error("发送MQ音讯失败 -- Topic:{},Key:{},tag:{},body:{}"
, msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
LOGGER.error("Exception -- : ", e);
}
private void success(Message msg, String messageId) {
LOGGER.info("发送MQ音讯胜利 -- Topic:{},msgId:{},Key:{},tag:{},body:{}"
, msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
}
}
音讯消费者配置
@Configuration
@RefreshScope
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private TopicProperites topicProperites;
@Autowired
private MqMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, topicProperites.getGroupId());
//将消费者线程数固定为20个 20为默许值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
//重试次数 -1代表重试16次
properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(topicProperites.getTopic());
subscription.setExpression(topicProperites.getTag());
subscriptionTable.put(subscription, messageListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
音讯消费者
@Component
public class MqMessageListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageListener.class);
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
//消费音讯
String msgTag = message.getTag();
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
LOGGER.info("接纳到MQ音讯 -- Topic:{},tag:{},msgId:{},Key:{},body:{}",
message.getTopic(), msgTag, message.getMsgID(), message.getKey(),
new String(message.getBody()));
//获取音讯重试次数
int reconsumeTimes = message.getReconsumeTimes();
LOGGER.info("获取音讯重试次数为:" + reconsumeTimes);
//消费胜利,继续消费下一条音讯
return Action.CommitMessage;
} catch (Exception e) {
LOGGER.error("消费MQ音讯失败! msgId:" + message.getMsgID() + " ---Exception:" + e);
//消费失败,告知效劳器稍后再投递这条音讯,继续消费其他音讯
return Action.ReconsumeLater;
}
}
}
————————————————
有疑问加站长微信联系(非本文作者))
