厦门注册公司网上申请入口重庆seo推广
一、带标签的Tag消息
1.1、概述
RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才能区别对待。
1.2、什么时候该用Topic,什么时候该用Tag
不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:
(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;
(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;
(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;
(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;
总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。
二、案例代码
2.1、pom
同案例五
2.2、RocketMQConstant
同案例五
2.3、消费者
2.3.1、TagConsumer1
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer1 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer1Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","NBA");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer1]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer1] start success");}}
2.3.2、TagConsumer2
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer2 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer2Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","RUN");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer2]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer2] start success");}}
2.3.3、TagConsumer3
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer3 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer3Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","STAR || CAR");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer3]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer3] start success");}}
2.3.4、TagConsumer4
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer4 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer4Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","CAR || MOBILE || TOURISM");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer4]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer4] start success");}}
2.4、TagProducer
package org.star.tag.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:22* @Description: Tag消息生产者*/
@Slf4j
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("TagProducer");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info("Tag消息生产者 start success");String[] tags = new String[]{"NBA", "RUN", "STAR","CAR","MOBILE","TOURISM"};for (int i = 0; i < 6; i++) {String tag = tags[i % tags.length];String content = "";if ("NBA".equals(tag)) {content = "NBA message,消息编号[" + i + "]";} else if ("RUN".equals(tag)) {content = "RUN message,消息编号[" + i + "]";} else if ("STAR".equals(tag)) {content = "STAR message,消息编号[" + i + "]";} else if ("CAR".equals(tag)) {content = "CAR message,消息编号[" + i + "]";} else if ("MOBILE".equals(tag)) {content = "MOBILE message,消息编号[" + i + "]";} else if ("TOURISM".equals(tag)) {content = "TOURISM message,消息编号[" + i + "]";}log.info("当前tag:{},消息内容:{}", tag, content);Message message = new Message("TagTopic", tag, content.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);log.info("sendStatus:{},brokerName:{},queueId:{},msgId:{}", result.getSendStatus(), result.getMessageQueue().getBrokerName(), result.getMessageQueue().getQueueId(), result.getMsgId());}producer.shutdown();}}
2.5 、控制台打印结果
# 生产者
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - Tag消息生产者 start success
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - 当前tag:NBA,消息内容:NBA message,消息编号[0]
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D530000
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - 当前tag:RUN,消息内容:RUN message,消息编号[1]
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D5C0001
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - 当前tag:STAR,消息内容:STAR message,消息编号[2]
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:2,msgId:0AA867618F8018B4AAC2262C1D640002
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - 当前tag:CAR,消息内容:CAR message,消息编号[3]
09:53:45.322 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:3,msgId:0AA867618F8018B4AAC2262C1D680003
09:53:45.323 [main] INFO org.star.tag.producer.TagProducer - 当前tag:MOBILE,消息内容:MOBILE message,消息编号[4]
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D6B0004
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - 当前tag:TOURISM,消息内容:TOURISM message,消息编号[5]
09:53:45.329 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D6E0005# 消费者TagConsumer1
09:53:45.310 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer1 - 消费者[TagConsumer1]收到消息,消息详情:NBA message,消息编号[0]# 消费者TagConsumer2
09:53:45.316 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer2 - 消费者[TagConsumer2]收到消息,消息详情:RUN message,消息编号[1]# 消费者TagConsumer3
09:53:45.322 [ConsumeMessageThread_3] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:STAR message,消息编号[2]
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:CAR message,消息编号[3]# 消费者TagConsumer4
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:CAR message,消息编号[3]
09:53:45.344 [ConsumeMessageThread_6] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:MOBILE message,消息编号[4]
09:53:45.344 [ConsumeMessageThread_5] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:TOURISM message,消息编号[5]