当前位置: 首页 > news >正文

外贸网站建设怎么选海南seo顾问服务

外贸网站建设怎么选,海南seo顾问服务,比较好的做网站公司,专做网站的公司RocketMQ 一、MQ概述 Message Queue,是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。 MQ的作用 限流削峰:当用户发送超量请求时,将请求暂存,以便后期慢慢处理。如果不使用MQ暂存直接请求到…

RocketMQ

一、MQ概述

Message Queue,是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。

MQ的作用

  • 限流削峰:当用户发送超量请求时,将请求暂存,以便后期慢慢处理。如果不使用MQ暂存直接请求到业务系统中容易引起系统崩溃。
  • 异步解耦:若上游系统和下游系统为同步调用,会大大降低系统的吞吐量和并发量。MQ层实现两个系统之间的异步调用
  • 数据收集:分布式系统会产生海量数据流,如业务日志、监控数据、用户行为。针对这些数据流采集汇总,进行大数据分析。

主流应用的MQ产品

  • Kafka:Scala/Java语言开发。特点是高吞吐量,但会丢数据,常用与大数据领域的实时计算、日志采集等场景。不遵循任何MQ协议,使用自研协议。
  • RocketMQ:Java语言开发。经过数年阿里双十一考验,性能与稳定性非常高,功能全面。不遵循任何MQ协议,使用自研协议。开源版不如云上版(阿里商业版)

MQ常见协议

  • JMS:Java Messaging Service。Java平台上有关MOM(Message Orientated Middleware)的技术规范。他便于Java应用程序的消息交换,提供标准的接口简化开发。ActiveMQ时典型实现

  • STOMP:Streaming Text Orientated Message Protocol。是一种MOM的简单文本协议。STOMP提供一个可互操作的连接格式,允许 客户端与任意STOMP消息代理进行交互。ActiveMQ时典型实现

  • AMQP:Advanced Message Queuing Protocol。一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准。RabbitMQ是典型实现

  • MQTT:Message Queueing Telemetry Transport。IBM开发的一个即时通讯协议(二进制协议),主要用于服务器和低功耗IoT设备之间的通信

二、基本概念

主题(Topic):表示一类消息的集合(可以理解为消息的类型),每个消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。一个生产者可以同时发送多种Topic消息,而一个消费者只能接收一种Topic消息

标签(Tag):用于快速过滤消息

三、Linux部署RocketMQ服务

1、在官网下载编译好的二进制压缩包,版本5.0.0即可,上传到Linux中

2、进行解压

3、配置环境变量ROCKETMQ_HOME和NAMESRV_ADDR

在这里插入图片描述

4、配置bin目录下的runserver.sh,根据实际情况修改JVM的内存参数

5、配置bin目录下的runbroker.sh,根据实际情况修改JVM的内存参数

6、执行nohup命令后台运行RocketMQ服务(nameserver必须先启动,broker需要再nameserver上注册)

# 启动nameserver
nohup bin/mqnamesrv &	# 启动broker
nohup bin/mqbroker -c [confFile] & # -c可指定加载的配置文件,默认为conf/broker.conf# 查看日志rocketmq是否成功启动
tail nohup.out	# 查看进程
jps		# 停止broker
sh bin/mqshutdown broker# 停止namesrv
sh bin/mqshutdown namesrv

7、执行命令测试(rocketmq提供的测试样例,生产者会发送一千条消息)

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

8、执行命令测试(rocketmq提供的测试样例,消费者会接受一千条消息)

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

四、RocketMQ API

生产者同步发送消息

public void test_SyncProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");//设置注册服务的ip地址的端口producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);//启动生产者producer.start();for(int i=0; i<3; i++){try {// 封装消息,设置topic,tag(用于消息快速过滤),消息数据Message message = new Message("TopicTest","TagA","ID04287777",("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//同步发送消息并获取发送结果,producer从broker获取发送结果SendResult sendResult = producer.send(message);System.out.println(sendResult);Thread.sleep(1500);} catch (Exception e) {throw new RuntimeException(e);}}producer.shutdown();
}

生产者异步发送消息

public void test_AsyncProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for(int i=0; i<messageCount; i++){final int index = i;// 封装消息,设置topic,tag(用于消息快速过滤),消息数据Message message = new Message("TopicTest","TagA","ID04287777",("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息,若broker有响应会调用SendCallback中的方法producer.send(message, new SendCallback() {public void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println("    Send Message "+ index +" OK: "+sendResult);}public void onException(Throwable throwable) {countDownLatch.countDown();System.out.println("    Send Message "+ index +" Exception: "+throwable);}});//单向发送producer.sendOneway(message);System.out.println("Message "+index+" send done");}//在100条消息发送完后关闭countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();
}

生产者单向发送消息

public void test_OneWayProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for(int i=0; i<messageCount; i++){final int index = i;// 封装消息,设置topic,tag(用于消息快速过滤),消息数据Message message = new Message("TopicTest","TagA","ID04287777",("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//单向发送producer.sendOneway(message);System.out.println("Message "+index+" send done");}//在100条消息发送完后关闭countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();
}

消费者推模式

public static void test_PushConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");consumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//消费者订阅的消息topic和tag(subExpression,*表示任意)consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("Receive New Message : "+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Start...");
}

消费者拉模式

不同于推模式消费者,拉模式下需要手动管理消息队列MessageQueue和偏移量offset的映射关系。但是最新的LitePullConsumer底层源码已经实现对mq和offset的管理,比较方便。

//拉模式消费者
public static void test_LitePullConsumer() throws Exception{DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(RocketMQConstant.CONSUMER_GROUP_NAME);litePullConsumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);litePullConsumer.subscribe("TopicTest", "*");litePullConsumer.start();try {while(true){List<MessageExt> messageExts = litePullConsumer.poll();System.out.printf("%s%n", messageExts);}}finally {litePullConsumer.shutdown();}
}

RocketMQ传递对象,对象所属类需要实现序列化接口,并且将对象转换为字节数组存入消息体中。

顺序消息

保证消息的局部有序(其中几条消息的有序,不一定是全部消息都要有序),以防止受到网络传输的影响。

实现原理

生产者将一组有序的消息一次发到同一个MessageQueue中(依靠队列的特点保证局部有序性)。消费者消费完一个MessageQueue的消息后才会去消费下一个MessageQueue的消息。

public class OrderProducer {public static void main(String[] args) {DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);try {producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for(int i=0; i<5; i++){//用于指定顺序的idint orderId = i;for(int j=0; j<5; j++){Message message = new Message(WanfengConstant.ORDER_TOPIC,"order_"+orderId,"KEY"+orderId,("order_"+orderId+" step "+j).getBytes(RemotingHelper.DEFAULT_CHARSET));//实现消息队列选择器对象,使同一个orderId的消息发送到同一个消息队列SendResult sendResult = producer.send(message,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}},orderId);System.out.printf("%s%n", sendResult);}}}catch(Exception e){e.printStackTrace();producer.shutdown();}}
}
public class OrderConsumer {public static void main(String[] args) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);consumer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);try {consumer.subscribe(WanfengConstant.ORDER_TOPIC, "*");//实现顺序消息监听者接口consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for(MessageExt messageExt : msgs){System.out.println("Receive Message: " + new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Start...");} catch (Exception e) {e.printStackTrace();consumer.shutdown();}}
}

广播消息

生产者发送的消息推送给所有group的消费者

实现原理:将消费者设置MessageModel为广播模式。

public class BroadcastConsumer {public static void main(String[] args) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设定消息模式为广播consumer.setMessageModel(MessageModel.BROADCASTING);try {consumer.subscribe(WanfengConstant.ARCHIVE_TOPIC, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {Archive archive = (Archive) WanfengObjectUtil.bytesToObject(messageExt.getBody());System.out.println("Receive Message : "+archive.getId());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Broadcast Consumer Start...");}catch (Exception e){e.printStackTrace();consumer.shutdown();}}
}

若指定MessageModel为CLUSTERING,则生产者发送的消息会随机指定消费者消费。

延迟消息

顾名思义就是消息发送到broker时延迟指定的时间后再发送给消费者。常用于定时发送

过滤消息

过滤消息通过tag实现,在消费者端指定过滤的tag即可。

//消费者订阅tag1或tag2的消息
consumer.subscribe("TopicTest", "tag1 || tag2");

在RocketMQ中,消费者指定过滤条件后,将其上推到Broker中,在Broker中进行tag过滤,以减少网络IO,但同时也增加了Broker的繁忙。

事务消息

在这里插入图片描述

public class TransactionProducer {public static void main(String[] args) {TransactionMQProducer producer = new TransactionMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println("[WANFENG-INFO] TransactionProducer.executeLocalTransaction(): 执行成功...");String tags = msg.getTags();if (StringUtils.contains(tags, "TagA")) {//消息提交(发送出去)return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.contains(tags, "TagB")) {//消息回滚(丢掉消息)return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("[WANFENG-INFO] TransactionProducer.checkLocalTransaction(): 执行成功...");String tags = msg.getTags();if (StringUtils.contains(tags, "TagC")) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}};ExecutorService executorService = new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);try {producer.start();} catch (Exception e) {e.printStackTrace();}String[] tags = new String[]{"TagA", "TagB", "TagC"};CountDownLatch countDownLatch = new CountDownLatch(9);for (int i = 0; i < 9; i++) {try {Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(message, null);System.out.println(sendResult);Thread.sleep(1000);countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {try {Thread.sleep(100000);} catch (InterruptedException e) {throw new RuntimeException(e);}producer.shutdown();}}
}

ACL权限控制

ACL对用户对Topic资源的访问权限进行控制

在pom依赖中引入acl的依赖包

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>5.0.0</version>
</dependency>

在服务端的conf/broker.conf文件,添加配置,开启acl

aclEnable=true

在服务端的conf/plain_acl.yml文件,配置具体权限规则(热加载,不需要重启mq)

accounts:- accessKey: RocketMQ #用户名secretKey: 12345678 #密码whiteRemoteAddress:   #访问地址白名单admin: false	#是否为管理员(管理员可以访问所有Topic)defaultTopicPerm: DENY #默认Topic访问权限defaultGroupPerm: SUB  #默认组权限topicPerms:		#Topic对应的权限,若这里找不到则采用defaultTopicPerm- topicA=DENY 	- topicB=PUB|SUB- topicC=SUBgroupPerms:# the group should convert to retry topic- groupA=DENY- groupB=PUB|SUB- groupC=SUB

在创建生产者对象时需加入RPCHook(acl的用户信息)

public class AclProducer {private static final String ACL_ACCESS_KEY = "RocketMQ";private static final String ACL_SECRET_KEY = "12345678";/*** 通过用户名和密码获取RPCHook* @return*/public static RPCHook getAclRPCHook(){return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));}public static void main(String[] args) throws MQClientException, InterruptedException {//创建生产者时加入用户信息,即RPCHookDefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, getAclRPCHook());producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for (int i = 0; i < 20; i++) {try {Message message = new Message("TopicTest",WanfengConstant.TAGS_NAME,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/);SendResult sendResult = producer.send(message);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}}
}

消息轨迹

Producer,Consumer,Broker处理消息的相关信息

消息轨迹的实现原理是MQ把消息轨迹都往RMQ_SYS_TRACE_TOPIC的Topic中放

在Broker端配置文件开启消息轨迹

traceTopicEnable=true

创建生产者时指定enableMsgTrace参数为true,开启消息轨迹。也可以指定customizedTraceTopic参数来自定义消息轨迹的Topic。

public class TraceProducer {public static void main(String[] args) throws MQClientException {//指定enableMsgTrace参数为true,开启消息轨迹DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, true);producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for (int i = 0; i < 20; i++) {try {Message message = new Message("TopicTest",WanfengConstant.TAGS_NAME,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/);SendResult sendResult = producer.send(message);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();}}}
}
http://www.hrbkazy.com/news/29709.html

相关文章:

  • 建个微商城网站多少钱如何去推广自己的产品
  • 网站如何做问卷调查问卷池州网站seo
  • 领导不愿意做招聘网站怎么办灰色产业推广引流渠道
  • wordpress 站点更改免费创建网站平台
  • 主要是WordPress专业网站优化外包
  • 做宣传的网站站长工具流量统计
  • 网站必须做商标么深圳网站推广
  • 杭州做网站 做小程序百度竞价点击神器奔奔
  • 网站的下载链接怎么做武汉大学人民医院
  • 公司网站开发费用放在什么科目seo优化推广技巧
  • b2b典型的网站个人接外包项目平台
  • 手机产品展示网站模板高级搜索引擎
  • wordpress小程序商城做网站seo优化
  • 成都网站优化步骤免费域名注册申请
  • 凡科小程序直播杭州网站seo推广软件
  • 网站工作和网站建设管理工作微信朋友圈广告推广
  • 医院建设官方网站必要性海外互联网推广平台
  • 苏州做网站品牌公司企业seo排名外包
  • 济南公司建站seo搜索引擎优化知乎
  • 有哪些做微信小游戏的网站网站推广关键词排名优化
  • 焦作网站设计公司外贸网站建设优化推广
  • 有关做美食的网站有哪些nba湖人队最新消息
  • 河北做网站网络推广计划书
  • 顺德网站制作案例信息手机百度下载app
  • 做旅游宣传图的网站有哪些百度小说排行榜第一名
  • 关停网站的申请网址制作
  • 社保网站是每月1-6号都是在建设中的吗提高网站流量的软文案例
  • 网站建设进度及实过程免费crm客户管理系统
  • 大型门户网站建设企业长春网站优化指导
  • 网站制作工作室哪家比较好电子商务说白了就是干什么的