微信h5游戏网站建设网页优化最为重要的内容是
扇形交换机:Fanout Exchange扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。 这个交换机没有路由键概念,就算你绑了路由键也是无视的。 =======================创建扇形交换机=============================================
@Autowired AmqpAdmin amqpAdmin;
/*** 创建扇形交换机* @return*/ @GetMapping("/createFanout") public void createFanout() {//交换机String exchangeName = "EW_EXCHANGE_FANOUT";//国内版队列String queueProductName = "EW_PRODUCT_ZH_FANOUT";//国际版队列String queueProductNameTwo = "EW_PRODUCT_EN_FANOUT";//交换机FanoutExchange exchange = new FanoutExchange(exchangeName, true, false);amqpAdmin.declareExchange(exchange);//产品队列Queue queue = new Queue(queueProductName, true, false, false);amqpAdmin.declareQueue(queue);Queue queue2 = new Queue(queueProductNameTwo, true, false, false);amqpAdmin.declareQueue(queue2);//交换机与设备绑定Binding bindingProduct = BindingBuilder.bind(queue).to(exchange);amqpAdmin.declareBinding(bindingProduct);Binding bindingProduct2 = BindingBuilder.bind(queue2).to(exchange);amqpAdmin.declareBinding(bindingProduct2);}
=======================发送消息===============================
/*** rabbitmq 管理组件*/private final RabbitTemplate rabbitTemplate;
/*** @param vo 发送消息* @param topExchange 交换机* @param routingKey 路由 扇形交换机不识别路由* @return*/ public static void syncDataChange(byte[] vo, String topExchange, String routingKey, RabbitTemplate rabbitTemplate) {try {if (null != vo) {Message message = MessageBuilder.withBody(vo).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.convertAndSend(topExchange, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));}} catch (Exception e) {e.printStackTrace();} }
==========================接收消息===================================
//缓存 private StringRedisTemplate redisTemplate;
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="EW_PRODUCT_ZH_FANOUT"),exchange = @Exchange(value = "EW_EXCHANGE_FANOUT" type = ExchangeTypes.FANOUT))})@RabbitHandlerpublic void onMessage(Channel channel, Message message) throws IOException {//唯一标识String messageId = message.getMessageProperties().getMessageId();//接收消息String msg = new String(message.getBody(), "UTF-8");try {if (StringUtils.isNotBlank(msg)) {//判断messageId在redis中是否存在boolean flage = stringRedisTemplate(messageId, msg);if (!flage) {log.error("消息已重复处理,拒绝再次接收..."); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {//如果要防止 重复消费,则需要将 id值存在 redis,每次 都要去redis中拿id比对,是否存在,存在则消费过->messageId // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->" + redisTemplate.opsForValue().get(messageId));}} else {log.info("消息为空拒绝接收..."); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收..."); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理..."); // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}log.info("===消费端Payload============ " + msg);}/*** 判断Key是否存在** @param messageId 唯一表示key* @param msg value值* @return*/private boolean stringRedisTemplate(String messageId, String msg) {log.info("messageId=" + messageId);//判断Key是否存在 有则返回true,没有则返回falseif (redisTemplate.hasKey(messageId)) {return false;} else {redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}