当前位置: 首页 > news >正文

公司建设网站制作汽车营销活动策划方案

公司建设网站制作,汽车营销活动策划方案,网站建设中关村,基于phpt漫画网站开发🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年3月4日 &#x1…

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月4日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • DefaultMessageStore
  • CommitLog
  • MappedFileQueue
  • MappedFile
  • submitFlushRequest

DefaultMessageStore

接上文,SendMessageProcessor对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage实例:
在这里插入图片描述
DefaultMessageStore的默认存储消息的方法asyncPutMessage如下:

@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime = this.getSystemClock().now();CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}});return putResultFuture;
}

整个存储流程的代码还是比较清晰的:

  1. 先判断消息是否投放成功 及 检查消息的格式
  2. 上述检查都没问题时就会把消息存储在CommitLog
CompletableFuture<PutMessageResult> putResultFuturethis.commitLog.asyncPutMessage(msg);

CommitLog

Broker接收到消息后,最终消息存储在Commitlog对象中,调用的是CommitLogputMessage方法

 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 消息队列编号 = 延迟级别 - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 加锁,同一时刻只能有一个线程put消息putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 只有不存在映射文件或文件已存满,才进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); }if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 追加消息至MappedFile的缓存中,更新写入位置wrotePositionresult = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;// 当文件剩余空间不足时,创建新的MappedFile并写入case END_OF_FILE: unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放锁putMessageLock.unlock();}/** log **/if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);// 异步刷盘流程return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}

消息在CommitLog文件中是顺序存储的

RocketMQ消息存储在CommitLog文件里,最终落盘,对应的类为MappedFile,它是从MappedFileQueue中获取的,如果对象不存在,就会创建:

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}

创建(获取)完成对象之后就会把消息插入到mappedFile里,如果文件放不下了,则会重新创建一个mappedFile来对其进行写入,最后就是使用异步Future的方式把消息持久化到磁盘上。


MappedFileQueue

上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象:

public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

MappedFile对象为空时,表示MappedFile对象不存在,此时就需要重新创建一个MappedFile对象,相应的方法在MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);// 批量删除文件上限private static final int DELETE_FILES_BATCH_MAX = 10;// 目录private final String storePath;// 每个映射文件的大小private final int mappedFileSize;// 映射文件数组private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 分配MappedFile服务private final AllocateMappedFileService allocateMappedFileService;// 最后flush到的offsetprivate long flushedWhere = 0;// 最后commit到的offsetprivate long committedWhere = 0;// 最后保存的时间戳private volatile long storeTimestamp = 0;/***	获取最后一个可写入的映射文件*	当最后一个文件已经满的时候,创建一个新的文件*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();// 不存在映射文件if (mappedFileLast == null) {// 计算从哪个offset开始createOffset = startOffset - (startOffset % this.mappedFileSize);}// 最后一个文件已满if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 创建文件if (createOffset != -1 && needCreate) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}public MappedFile getLastMappedFile(final long startOffset) {return getLastMappedFile(startOffset, true);}
}

当获取到的“最后一个”MappedFile不存在或文件已满时,则新建一个,并计算新文件的createOffset

MappedFile

CommitLog的代码中,最终把消息追加到MappedFile文件的缓冲区中,同时更新其写入位置writePosition,但是还没有刷盘:

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);

其中调用了MappedFileappendMessage方法实现添加消息到消息映射文件:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {// 缓冲区ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;// 判断是否是批量操作if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新写入位置 + 写入偏移量this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}

通过源码可以看到,实际上是吧消息放入ByteBuffer,同时更新写入位置和偏移量


submitFlushRequest

提交刷盘请求的源码如下:

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 异步flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 同步flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();	// 异步,使用MappedByteBuffer(默认策略)} else  {commitLogService.wakeup();	// 异步,使用字节缓冲区 + FileChannel}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}

CommitLog文件刷盘是由FlushCommitLogService服务具体执行,一共有两种刷盘策略:

  1. FlushDiskType.SYNC_FLUSH:同步刷盘
  2. FlushDiskType.ASYNC_FLUSH:异步刷盘

同步刷盘的情况下,必须要等到数据刷盘成功后才会有返回结果;如果是异步刷盘,只需要把消息放入内存之后即可返回。

上述策略可以在broker.conf配置文件中进行配置

http://www.hrbkazy.com/news/38464.html

相关文章:

  • 大学网站建设多少钱网址提交
  • 做视频网站源码谷歌app下载
  • 做网站算软件开发么多层次网络营销合法吗
  • 简单炫酷的网站网站制作的服务怎么样
  • 忻州网站建设求职简历友情链接外链
  • 营销型网站建设的价格贵阳百度快照优化排名
  • 江苏艺居建设有限公司网站最新百度新闻
  • 网站建设优化建议凡科建站的免费使用
  • 淄博网站建设优化seo就是搜索引擎广告
  • 怎么在阿里做网站百度推广的价格表
  • 能不能同行网站做站长统计seo刷词
  • 网站留言程序怎么做b站黄页推广软件
  • 什么网站做软文北京seo收费
  • 台州企业网站搭建价格网站seo的主要优化内容
  • 北京做网站建设价格低电商网站运营
  • 外贸网站建设平台云服务器免费
  • 模板形的网站制作seo工具有哪些
  • 沈阳网站建设公司怎么样新产品怎样推广
  • 网站页面优化方法有哪些内容sem竞价课程
  • 建一个公司网站要多少钱短视频怎么赚钱
  • 数据线 东莞网站建设百度推广下载安装
  • 服务器搭建网站视频教程中超最新积分榜
  • 建设 网站2022最近的新闻大事10条
  • vps网站管理助手搜索引擎营销的主要方式有哪些?
  • 哪个网站教做西餐长春刚刚最新消息今天
  • logo素材网站大搜推广
  • 道滘镇仿做网站王通seo教程
  • 最专业的网站设计最近的新闻热点
  • 做数学题赚钱的网站邯郸网站seo
  • 网站备案 必须在接入商处营销型网站建设套餐