当前位置: 首页 > news >正文

厦门注册公司网上申请入口重庆seo推广

厦门注册公司网上申请入口,重庆seo推广,java开发网站轮播图怎么做,怀安县网站建设一、带标签的Tag消息 1.1、概述 RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费&…

一、带标签的Tag消息

1.1、概述

        RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才能区别对待。

1.2、什么时候该用Topic,什么时候该用Tag

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

二、案例代码

2.1、pom

        同案例五

2.2、RocketMQConstant

        同案例五

2.3、消费者

2.3.1、TagConsumer1

package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer1 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer1Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","NBA");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer1]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer1] start success");}}

2.3.2、TagConsumer2

package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer2 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer2Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","RUN");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer2]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer2] start success");}}

2.3.3、TagConsumer3

package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer3 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer3Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","STAR || CAR");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer3]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer3] start success");}}

2.3.4、TagConsumer4

package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer4 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer4Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","CAR || MOBILE || TOURISM");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer4]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer4] start success");}}

2.4、TagProducer

package org.star.tag.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:22* @Description: Tag消息生产者*/
@Slf4j
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("TagProducer");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info("Tag消息生产者 start success");String[] tags = new String[]{"NBA", "RUN", "STAR","CAR","MOBILE","TOURISM"};for (int i = 0; i < 6; i++) {String tag = tags[i % tags.length];String content = "";if ("NBA".equals(tag)) {content = "NBA message,消息编号[" + i + "]";} else if ("RUN".equals(tag)) {content = "RUN  message,消息编号[" + i + "]";} else if ("STAR".equals(tag)) {content = "STAR  message,消息编号[" + i + "]";} else if ("CAR".equals(tag)) {content = "CAR  message,消息编号[" + i + "]";} else if ("MOBILE".equals(tag)) {content = "MOBILE  message,消息编号[" + i + "]";} else if ("TOURISM".equals(tag)) {content = "TOURISM  message,消息编号[" + i + "]";}log.info("当前tag:{},消息内容:{}", tag, content);Message message = new Message("TagTopic", tag, content.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);log.info("sendStatus:{},brokerName:{},queueId:{},msgId:{}", result.getSendStatus(), result.getMessageQueue().getBrokerName(), result.getMessageQueue().getQueueId(), result.getMsgId());}producer.shutdown();}}

2.5 、控制台打印结果

# 生产者
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - Tag消息生产者 start success
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - 当前tag:NBA,消息内容:NBA message,消息编号[0]
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D530000
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - 当前tag:RUN,消息内容:RUN  message,消息编号[1]
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D5C0001
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - 当前tag:STAR,消息内容:STAR  message,消息编号[2]
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:2,msgId:0AA867618F8018B4AAC2262C1D640002
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - 当前tag:CAR,消息内容:CAR  message,消息编号[3]
09:53:45.322 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:3,msgId:0AA867618F8018B4AAC2262C1D680003
09:53:45.323 [main] INFO org.star.tag.producer.TagProducer - 当前tag:MOBILE,消息内容:MOBILE  message,消息编号[4]
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D6B0004
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - 当前tag:TOURISM,消息内容:TOURISM  message,消息编号[5]
09:53:45.329 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D6E0005# 消费者TagConsumer1
09:53:45.310 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer1 - 消费者[TagConsumer1]收到消息,消息详情:NBA message,消息编号[0]# 消费者TagConsumer2
09:53:45.316 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer2 - 消费者[TagConsumer2]收到消息,消息详情:RUN  message,消息编号[1]# 消费者TagConsumer3
09:53:45.322 [ConsumeMessageThread_3] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:STAR  message,消息编号[2]
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:CAR  message,消息编号[3]# 消费者TagConsumer4
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:CAR  message,消息编号[3]
09:53:45.344 [ConsumeMessageThread_6] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:MOBILE  message,消息编号[4]
09:53:45.344 [ConsumeMessageThread_5] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:TOURISM  message,消息编号[5]

        

http://www.hrbkazy.com/news/42117.html

相关文章:

  • 西安网站建设优化seo上排名
  • 政务公开网站建设方案互联网营销培训平台
  • 慈溪企业网站北京债务优化公司
  • 青海项目信息网太原自动seo
  • 郑州网站制作营销巨量算数
  • 都江堰网站建设公司设计外包网站
  • 萝岗做网站营销网课
  • wordpress主题怎么汉化windows优化大师使用方法
  • 成都网站制作沈阳网络营销的三大基础
  • wordpress使用菜单搜索引擎优化是什么意思啊
  • 重庆所有做网站的公司个人网站seo
  • 网站建设意义模板广州百度提升优化
  • 10.制作一个网站一般先要明确( )招代理最好的推广方式
  • 德阳网站建设求职简历b2b免费发布信息平台
  • 北京昌平网站设计免费网站推广群发软件
  • 网站广东省备案广州从化发布
  • 芜湖小学网站建设电商运营培训正规平台
  • 济南网站搜索引擎优化百度广告联盟下载
  • 关于网站建设总结2021十大网络舆情案例
  • 企业起名昆明优化网站公司
  • 深圳市宝安区投资推广署站长工具seo综合查询推广
  • 深圳建设网站开发今天热搜榜前十名
  • 哪有深圳设计公司福州seo网站管理
  • php做网站开源项目单词优化和整站优化
  • 甘肃疫情最新消息今天新增网络搜索引擎优化
  • 湖南建设信息网站合肥百度关键词推广
  • 做网站的设计软件seo课程培训视频
  • 什么公司做的网站好网站seo搜索引擎的原理是什么
  • 做网站可以使用免费空间吗移动慧生活app下载
  • 电商o2o是什么意思深圳优化排名公司