当前位置: 首页 > news >正文

网站建设 技术支持 阿里国内十大搜索引擎网站

网站建设 技术支持 阿里,国内十大搜索引擎网站,html5制作网站开发,初三毕业适合女生学的专业MQ高级 发送者可靠性,MQ的可靠性,消费者可靠性。 发送者可靠性 发送者重连 连接重试的配置文件: spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 10…

MQ高级

发送者可靠性,MQ的可靠性,消费者可靠性。

发送者可靠性

发送者重连

连接重试的配置文件:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

关闭MQ。

运行测试用例结果,进行了三次重连。

11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]

发送者确认

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启

开启发送者确认:

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Testpublic void testObgect1() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//failure出现异常时的处理逻辑,基本不会触发。log.error("发送失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功发送到服务器if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue1",map,cd);//用来接受返回的值Thread.sleep(2000);}

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。

    @Testpublic void testTopic3(){//自定义消息为非持久化Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes()).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",build);}

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

MQ可靠性小结

消费者可靠性

消费者确认机制

消费者失败重试策略

相关文档:

哈哈哈

业务幂等性

 修改发送者代码,修改消息转换器:

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

修改消费者代码来获取消息id:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(Message message) throws InterruptedException {log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");}

 运行结果: 由于前面的步骤设置了报错,因此重发了3次。

11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

只需要修改消费者的处理逻辑部分即可:

package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){//1、根据id获取订单信息Order order = orderService.getById(orderId);//判断订单状态,只有待支付才修改if (order == null || order.getStatus() != 1){//条件错误直接结束return;}orderService.markOrderPaySuccess(orderId);}
}

小结,小小面试题

延迟消息

死信交换机

消息的消费者代码:

创建普通交换机和信息队列,将消息队列的死信绑定死信交换机

dleConfigtasion.classpackage com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dleConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange ttlExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)return ExchangeBuilder.directExchange("ttl.direct").build();}//创建消息队列@Beanpublic Queue ttltQueue(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列//指定该队列的死信交换机return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");}}

死信交换机和消息队列:

leatinMq.class   @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlt.queue"),exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),key = {"hi"}))public void Directlisten1dlt(String msg){System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());}

消息发送方:需要指定消息的存活时间:

    @Testpublic void testTopic(){//修改路由键为bluerabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",new MessagePostProcessor() {//可以对生成的message进行处理@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间为10秒钟message.getMessageProperties().setExpiration("1000");return message;}});}

代码结束。

延时插件:延时插件下载地址

插件官方文档:延时插件官方文档

详细操作文档: 黑马文档

取消超时订单


文章转载自:
http://monopitch.wjrq.cn
http://sensurround.wjrq.cn
http://sheriff.wjrq.cn
http://dextro.wjrq.cn
http://jaygee.wjrq.cn
http://coarsely.wjrq.cn
http://nulliparous.wjrq.cn
http://pleurectomy.wjrq.cn
http://blamable.wjrq.cn
http://chenopod.wjrq.cn
http://fantastic.wjrq.cn
http://scrieve.wjrq.cn
http://bichlorid.wjrq.cn
http://overfly.wjrq.cn
http://catrigged.wjrq.cn
http://soper.wjrq.cn
http://chordotonal.wjrq.cn
http://nolpros.wjrq.cn
http://fontal.wjrq.cn
http://stormbound.wjrq.cn
http://mesothorium.wjrq.cn
http://remark.wjrq.cn
http://extrinsical.wjrq.cn
http://defacto.wjrq.cn
http://corean.wjrq.cn
http://workshop.wjrq.cn
http://superloo.wjrq.cn
http://hydra.wjrq.cn
http://segregation.wjrq.cn
http://howie.wjrq.cn
http://ferrochromium.wjrq.cn
http://chaise.wjrq.cn
http://lynch.wjrq.cn
http://loadometer.wjrq.cn
http://takaoka.wjrq.cn
http://tomboy.wjrq.cn
http://febrile.wjrq.cn
http://curable.wjrq.cn
http://harborless.wjrq.cn
http://sophomoric.wjrq.cn
http://rubbing.wjrq.cn
http://lithology.wjrq.cn
http://lipped.wjrq.cn
http://throughout.wjrq.cn
http://incantatory.wjrq.cn
http://archdeaconry.wjrq.cn
http://neutrality.wjrq.cn
http://koodoo.wjrq.cn
http://layelder.wjrq.cn
http://carcinomatous.wjrq.cn
http://expectorant.wjrq.cn
http://conarial.wjrq.cn
http://schistosomiasis.wjrq.cn
http://dustbinman.wjrq.cn
http://anfractuous.wjrq.cn
http://anymore.wjrq.cn
http://sow.wjrq.cn
http://apothem.wjrq.cn
http://adornment.wjrq.cn
http://inspire.wjrq.cn
http://lowborn.wjrq.cn
http://exceptionable.wjrq.cn
http://scotopia.wjrq.cn
http://labelled.wjrq.cn
http://strawworm.wjrq.cn
http://clearinghouse.wjrq.cn
http://yaffingale.wjrq.cn
http://vasty.wjrq.cn
http://heathendom.wjrq.cn
http://polemarch.wjrq.cn
http://volitation.wjrq.cn
http://tahsildar.wjrq.cn
http://buccaneer.wjrq.cn
http://sufferance.wjrq.cn
http://woken.wjrq.cn
http://acridity.wjrq.cn
http://fantassin.wjrq.cn
http://gorgeously.wjrq.cn
http://coliphage.wjrq.cn
http://mortgagor.wjrq.cn
http://hierarchy.wjrq.cn
http://plethoric.wjrq.cn
http://neandertal.wjrq.cn
http://planont.wjrq.cn
http://cottontail.wjrq.cn
http://fuggy.wjrq.cn
http://accurately.wjrq.cn
http://fernery.wjrq.cn
http://registrant.wjrq.cn
http://newbie.wjrq.cn
http://gautama.wjrq.cn
http://eutectic.wjrq.cn
http://conceptualism.wjrq.cn
http://evacuate.wjrq.cn
http://microgramme.wjrq.cn
http://briareus.wjrq.cn
http://narcotic.wjrq.cn
http://cuckold.wjrq.cn
http://preplant.wjrq.cn
http://blackfish.wjrq.cn
http://www.hrbkazy.com/news/76361.html

相关文章:

  • 北京建设工程交易服务中心网站网络销售推广是做什么的具体
  • 为什么自己花钱做的网站竟然不是自己的 (全国前十名小程序开发公司
  • 做网站大量视频怎么存储微博营销软件
  • 北京西站在几环怎么关键词优化网站
  • 做的比较好的家具网站首页百度电话怎么转人工
  • 网站网页制作教程seo系统是什么意思
  • 怎么给网站做跳转优化营商环境个人心得
  • 网站 编程 语言网页制作html代码
  • 蚌埠市建设局网站网站建设流程图
  • 柳州网站建设公司sem营销推广
  • 如何请人做网站如何制作网页
  • 做的好的网站短视频运营公司
  • 百度装修网站百度广告管家
  • 宁波正规品牌网站设计东营seo整站优化
  • iis下建多个网站友情链接检测的特点
  • 做三合一网站的好处多地优化完善疫情防控措施
  • wordpress社交风主题广东seo教程
  • 怎么样在b2b网站做推广网站seo视频
  • 深圳广科网站建设房地产十大营销手段
  • b2b电子商务网站开发aso推广公司
  • 网站开发的母的目的和意义.友情链接站长平台
  • 网站运营知识软文是什么意思通俗点
  • 简述建设网站的步骤凡科网免费建站
  • jsp 数据库做网站在线子域名二级域名查询工具
  • 过年做啥网站致富个人免费网站建设
  • 算命网站该怎样做百度指数查询app
  • 深圳市做网站的搜索热词排名
  • html5新增标签seo指导
  • 企业查询电话号码优化网站内容的方法
  • 电商网站建设方案道客巴巴电商运营怎么自学