接码网站源码分享 接码网页

本篇文章给大家谈谈接码网站源码分享,以及接码网页对应的知识点,文章可能有点长,但是希望大家可以阅读完,增长自己的知识,最重要的是希望对各位有所帮助,可以解决了您的问题,不要忘了收藏本站喔。

从本文开始,我们来分析rocketMq消息接收、分发以及投递流程。

RocketMq消息处理整个流程如下:

消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;消息分发:broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueue与indexFile,写入后,消息分发流程处理完毕;消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

以上就是rocketMq处理消息的流程了,接下来我们就从源码来看相关流程的实现。

1.remotingServer的启动流程

在正式分析接收与投递流程前,我们来了解下remotingServer的启动。

remotingServer是一个netty服务,他开启了一个端口用来处理producer与consumer的网络请求。

remotingServer是在BrokerControllerstart方法:

publicvoidstart(){\n…\n\nServerBootstrapchildHandler=\nthis.serverBootstrap.group(this.eventLoopGroupBoss,this.eventLoopGroupSelector)\n…\n.childHandler(newChannelInitializer<SocketChannel>(){\n@Override\npublicvoidinitChannel(SocketChannelch)throwsException{\nch.pipeline()\n.addLast(defaultEventExecutorGroup,\nHANDSHAKE_HANDLER_NAME,handshakeHandler)\n.addLast(defaultEventExecutorGroup,\nencoder,\nnewNettyDecoder(),\nnewIdleStateHandler(0,0,\nnettyServerConfig.getServerChannelMaxIdleTimeSeconds()),\nconnectionManageHandler,\n//处理业务请求的handler\nserverHandler\n);\n}\n});\n\n…\n\n}

这就是一个标准的netty服务启动流程了,套路与nameServer的启动是一样的。关于netty的相关内容,这里我们仅关注pipeline上的channelHandler,在netty中,处理读写请求的操作为一个个ChannelHandler,remotingServer中处理读写请求的ChanelHandler为NettyServerHandler,代码如下:

@ChannelHandler.Sharable\nclassNettyServerHandlerextendsSimpleChannelInboundHandler<RemotingCommand>{\n\n@Override\nprotectedvoidchannelRead0(ChannelHandlerContextctx,RemotingCommandmsg)throwsException{\nprocessMessageReceived(ctx,msg);\n}\n}

这块的操作与nameServer对外提供的服务极相似(就是同一个类),最终调用的是NettyRemotingAbstractinitialize方法)注册的。之前在分析BrokerController的初始化流程时,就提到过Processor的提供操作,这里再回顾下:

BrokerController的初始化方法initialize会调用BrokerControllerprocessRequestCommand方法。

在NettyRemotingAbstractprocessRequest看看它的流程:

publicRemotingCommandprocessRequest(ChannelHandlerContextctx,\nRemotingCommandrequest)throwsRemotingCommandException{\nRemotingCommandresponse=null;\ntry{\n//broker处理接收消息\nresponse=asyncProcessRequest(ctx,request).get();\n}catch(InterruptedException|ExecutionExceptione){\nlog.error(&34;+request.toString(),e);\n}\nreturnresponse;\n}

没干啥事,一路跟下去,直接看普通消息的流程,进入SendMessageProcessorasyncPutMessage:

publicCompletableFuture<PutMessageResult>asyncPutMessage(MessageExtBrokerInnermsg){\n…\n//保存到commitLog\nCompletableFuture<PutMessageResult>putResultFuture=this.commitLog.asyncPutMessage(msg);\n…\n}

3.1commitLog写入原理

一个broker逻辑上对应着一个commitLog,你可以把它看作一个大文件,然后这个broker收到的所有消息都写到这个里面,但是物理上ROCKET_HOME/commitlog/00000000000000000000这个路径存储的,它是由若干个文件组成的,每个文件默认大小是1G,然后每个文件都对应这个一个MappedFile,00000000000000000000就是第一个MappedFile对应的物理文件,每个文件的文件名就是在commitLog里面的一个其实offset,第二个文件名就是00000000001073741824,也就是上一个MappedFile文件起始offset加上每个文件的大小,这个MappedFile就是RocketMQ的黑科技,使用了内存映射技术来提高文件的访问速度与写入速度,然后都是采用追加写的方式提高写入速度。

我们直接看官方的描述(链接:github.com/apache/rock…):

rocketMq消息存储架构图

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

(1)CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2)ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节taghashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

(3)IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:HOME\\store\\index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。

正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

3.2CommitLogasyncPutMessage方法,这个方法有点长,我们分部分:

publicCompletableFuture<PutMessageResult>asyncPutMessage(finalMessageExtBrokerInnermsg){\n//Setthestoragetime设置存储时间\nmsg.setStoreTimestamp(System.currentTimeMillis());\n//SetthemessagebodyBODYCRC(considerthemostappropriatesetting\n//ontheclient)\n//设置crc\nmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));\n//BacktoResults\nAppendMessageResultresult=null;\n\nStoreStatsServicestoreStatsService=this.defaultMessageStore.getStoreStatsService();\n\nStringtopic=msg.getTopic();\nintqueueId=msg.getQueueId();\n\n//获取事务状态\nfinalinttranType=MessageSysFlag.getTransactionValue(msg.getSysFlag());\n//事务\nif(tranType==MessageSysFlag.TRANSACTION_NOT_TYPE\n||tranType==MessageSysFlag.TRANSACTION_COMMIT_TYPE){\n//DelayDelivery延时消息的处理\nif(msg.getDelayTimeLevel()>0){\nif(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){\nmsg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());\n}\n\n//设置延迟队列\ntopic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;\nqueueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());\n\n//Backuprealtopic,queueId\nMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());\nMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));\nmsg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));\n\nmsg.setTopic(topic);\nmsg.setQueueId(queueId);\n}\n}\n…

这一部分其实就是从msg中获取一些信息,判断处理一下这个延时消息。

longelapsedTimeInLock=0;\nMappedFileunlockMappedFile=null;\n//获取最后一个MappedFile\nMappedFilemappedFile=this.mappedFileQueue.getLastMappedFile();\n\n//获取写入锁\nputMessageLock.lock();//spinorReentrantLock,dependingonstoreconfig\ntry{\nlongbeginLockTimestamp=this.defaultMessageStore.getSystemClock().now();\n//开始在锁里的时间\nthis.beginTimeInLock=beginLockTimestamp;\n\n//Heresettingsarestoredtimestamp,inordertoensureanorderly\n//global\n//设置写入的时间戳,确保它是有序的,\nmsg.setStoreTimestamp(beginLockTimestamp);\n\n//判断MappedFile是否是null或者是否是满了\nif(null==mappedFile||mappedFile.isFull()){\nmappedFile=this.mappedFileQueue.getLastMappedFile(0);//Mark:NewFilemaybecausenoise\n}\nif(null==mappedFile){\nlog.error(&34;+msg.getTopic()+&34;+msg.getBornHostString());\nbeginTimeInLock=0;\nreturnCompletableFuture.completedFuture(newPutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED,null));\n}\n\n//todo往mappedFile追加消息\nresult=mappedFile.appendMessage(msg,this.appendMessageCallback);\n\n…

这一部分就比较重要了,首先是从mappedFileQueue中获取最后一个MappedFile,这个就是拿集合最后一个元素,因为都是有序的,最后一个元素就是最后一个MappedFile,接着就是获取锁了,这个锁也是比较有讲究的,可以设置使用ReentrantLock也可以设置使用cas,默认是使用cas,接着就是设置beginTimeInLock这个变量了,这个变量我们在判断ospagecache繁忙的时候说过,就是获取到锁的一个时间戳,在释放锁之前会重置成0,接着就是判断mappedFile是不是null或者是不是满了,如果是的话就要新建一个了。

接着就是最最最重要的了往mappedFile中追加消息,

mappedFile.appendMessage

/**\n*将消息追加到MappedFile文件中\n*/\npublicAppendMessageResultappendMessagesInner(finalMessageExtmessageExt,finalAppendMessageCallbackcb){\nassertmessageExt!=null;\nassertcb!=null;\n\n//获取MappedFile当前文件写指针\nintcurrentPos=this.wrotePosition.get();\n\n//如果currentPos小于文件大小\nif(currentPos<this.fileSize){\nByteBufferbyteBuffer=writeBuffer!=null?writeBuffer.slice():this.mappedByteBuffer.slice();\nbyteBuffer.position(currentPos);\nAppendMessageResultresult;\n//单个消息\nif(messageExtinstanceofMessageExtBrokerInner){\n//todo\nresult=cb.doAppend(this.getFileFromOffset(),byteBuffer,this.fileSize-currentPos,(MessageExtBrokerInner)messageExt);\n//批量消息\n}elseif(messageExtinstanceofMessageExtBatch){\nresult=cb.doAppend(this.getFileFromOffset(),byteBuffer,this.fileSize-currentPos,(MessageExtBatch)messageExt);\n}else{\nreturnnewAppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);\n}\nthis.wrotePosition.addAndGet(result.getWroteBytes());\nthis.storeTimestamp=result.getStoreTimestamp();\nreturnresult;\n}\n//如果currentPos大于或等于文件大小,表明文件已写满,抛出异常\nlog.error(&34;,currentPos,this.fileSize);\nreturnnewAppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);\n}

这里首先获取了一下这个mappedFile写到哪个位置了,它这个位置是从0开始的,然后判断一下当前位置与文件大小做对比,要是大于的就超了文件大小了,接着是获取writerbuffer因为这里是没有开启transientStorePool的,所以它是个空的,就会使用mmapedByteBuffer,接着就是调用回调的doAppend追加消息了,我们看下它的参数,第一个是开始offset,这个offset是commitlog的一个offset,举个例子,第一个MappedFile的开始offset是0,然后一个MappedFile的大小是1g,然后第二个MappedFile就得从1073741824(1g)开始了,第二个参数是bytebuffer,这个不用多说,第三个是这个MappedFile还空多少字节没用,第四个就是消息了。

我们来看下这个doAppend方法,这个也有点长,我们需要分开看下:

/**\n*//只是将消息追加到内存中\n*@paramfileFromOffset文件的第一个偏移量(就是MappedFile是从哪个地方开始的)\n*/\npublicAppendMessageResultdoAppend(finallongfileFromOffset,finalByteBufferbyteBuffer,finalintmaxBlank,\nfinalMessageExtBrokerInnermsgInner){\n//STORETIMESTAMP+STOREHOSTADDRESS+OFFSET<br>\n\n//PHYOFFSET\nlongwroteOffset=fileFromOffset+byteBuffer.position();\n\nintsysflag=msgInner.getSysFlag();\n\nintbornHostLength=(sysflag&MessageSysFlag.BORNHOST_V6_FLAG)==0?4+4:16+4;\nintstoreHostLength=(sysflag&MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)==0?4+4:16+4;\nByteBufferbornHostHolder=ByteBuffer.allocate(bornHostLength);\nByteBufferstoreHostHolder=ByteBuffer.allocate(storeHostLength);\n\nthis.resetByteBuffer(storeHostHolder,storeHostLength);\n//创建全局唯一消息id\nStringmsgId;\nif((sysflag&MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)==0){\nmsgId=MessageDecoder.createMessageId(this.msgIdMemory,msgInner.getStoreHostBytes(storeHostHolder),wroteOffset);\n}else{\nmsgId=MessageDecoder.createMessageId(this.msgIdV6Memory,msgInner.getStoreHostBytes(storeHostHolder),wroteOffset);\n}\n\n//RecordConsumeQueueinformation\nkeyBuilder.setLength(0);\nkeyBuilder.append(msgInner.getTopic());\nkeyBuilder.append(&39;);\nkeyBuilder.append(msgInner.getQueueId());\nStringkey=keyBuilder.toString();\n//获取该消息在消息队列的物理偏移量\nLongqueueOffset=CommitLog.this.topicQueueTable.get(key);\nif(null==queueOffset){\nqueueOffset=0L;\nCommitLog.this.topicQueueTable.put(key,queueOffset);\n}\n\n//Transactionmessagesthatrequirespecialhandling\nfinalinttranType=MessageSysFlag.getTransactionValue(msgInner.getSysFlag());\nswitch(tranType){\n//PreparedandRollbackmessageisnotconsumed,willnotenterthe\n//consumerqueue\ncaseMessageSysFlag.TRANSACTION_PREPARED_TYPE:\ncaseMessageSysFlag.TRANSACTION_ROLLBACK_TYPE:\nqueueOffset=0L;\nbreak;\ncaseMessageSysFlag.TRANSACTION_NOT_TYPE:\ncaseMessageSysFlag.TRANSACTION_COMMIT_TYPE:\ndefault:\nbreak;\n}

这一部分主要就是计算了一下这个消息写在commitlog中的一个offset,接着就是生成一个msgId,然后根据topic与queueId从缓存中获取了一下这个queueId对应的一个queue的offset,这个其实就是添加一个消息加1,然后就是事务的东西了,如果有事务,然后还在准备阶段或者回滚阶段,就将queueoffset设置成0,再往下其实就是处理消息,然后写到buffer中了。

/**\n*Serializemessage\n*/\nfinalbyte[]propertiesData=\nmsgInner.getPropertiesString()==null?null:msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);\n\nfinalintpropertiesLength=propertiesData==null?0:propertiesData.length;\n\nif(propertiesLength>Short.MAX_VALUE){\nlog.warn(&34;,propertiesData.length);\nreturnnewAppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);\n}\n\nfinalbyte[]topicData=msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);\nfinalinttopicLength=topicData.length;\n\nfinalintbodyLength=msgInner.getBody()==null?0:msgInner.getBody().length;\n\n//todo计算消息总长度\nfinalintmsgLen=calMsgLength(msgInner.getSysFlag(),bodyLength,topicLength,propertiesLength);\n\n//Exceedsthemaximummessage\nif(msgLen>this.maxMessageSize){//最大消息长度65536\nCommitLog.log.warn(&34;+msgLen+&34;+bodyLength\n+&34;+this.maxMessageSize);\nreturnnewAppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);\n}\n…

这里首先是获取了一下消息里面的properties,将它转成字节数组,计算了一下长度,接着就是将topic转成字节数据,计算了一下长度,获取了一下body的长度,就是你往Message塞得内容长度,重点来,计算消息的总长度,然后判断一下长度是否超长。其中calMsgLength如下:

//计算消息长度\nprotectedstaticintcalMsgLength(intsysFlag,intbodyLength,inttopicLength,intpropertiesLength){\nintbornhostLength=(sysFlag&MessageSysFlag.BORNHOST_V6_FLAG)==0?8:20;\nintstorehostAddressLength=(sysFlag&MessageSysFlag.STOREHOSTADDRESS_V6_FLAG)==0?8:20;\nfinalintmsgLen=4//TOTALSIZE消息条目总长度,4字节\n+4//MAGICCODE魔数,4字节。固定值0xdaa320a7\n+4//BODYCRC消息体的crc校验码,4字节\n+4//QUEUEID消息消费队列ID,4字节\n+4//FLAG消息标记,RocketMQ对其不做处理,供应用程序使用,默认4字节\n+8//QUEUEOFFSET消息在ConsumeQuene文件中的物理偏移量,8字节。\n+8//PHYSICALOFFSET消息在CommitLog文件中的物理偏移量,8字节\n+4//SYSFLAG消息系统标记,例如是否压缩、是否是事务消息等,4字节\n+8//BORNTIMESTAMP消息生产者调用消息发送API的时间戳,8字节\n+bornhostLength//BORNHOST消息发送者IP、端口号,8字节\n+8//STORETIMESTAMP消息存储时间戳,8字节\n+storehostAddressLength//STOREHOSTADDRESSBroker服务器IP+端口号,8字节\n+4//RECONSUMETIMES消息重试次数,4字节\n+8//PreparedTransactionOffset事务消息的物理偏移量,8字节。\n+4//消息体长度,4字节\n+(bodyLength>0?bodyLength:0)//BODY消息体内容,长度为bodyLenth中存储的值\n+1//主题存储长度,1字节,表示主题名称不能超过255个字符。\n+topicLength//TOPIC主题,长度为TopicLength中存储的值\n+2//消息属性长度,2字节,表示消息属性长度不能超过65536个字符。\n+(propertiesLength>0?propertiesLength:0)//propertiesLength消息属性,长度为PropertiesLength中存储的值\n+0;\nreturnmsgLen;\n}

继续:

…\n//todo消息长度+END_FILE_MIN_BLANK_LENGTH大于commitLog的空闲空间,则返回END_OF_FILE\nif((msgLen+END_FILE_MIN_BLANK_LENGTH)>maxBlank){\nthis.resetByteBuffer(this.msgStoreItemMemory,maxBlank);\n//1TOTALSIZE4字节存储当前文件的剩余空间\nthis.msgStoreItemMemory.putInt(maxBlank);\n//2MAGICCODE4字节存储魔数\nthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);\n//3Theremainingspacemaybeanyvalue\n//HerethelengthofthespeciallysetmaxBlank\nfinallongbeginTimeMills=CommitLog.this.defaultMessageStore.now();\nbyteBuffer.put(this.msgStoreItemMemory.array(),0,maxBlank);\nreturnnewAppendMessageResult(AppendMessageStatus.END_OF_FILE,wroteOffset,maxBlank,msgId,msgInner.getStoreTimestamp(),\nqueueOffset,CommitLog.this.defaultMessageStore.now()-beginTimeMills);\n}\n…\n

判断剩下的空间能不能放开,如果放不开的话,就塞上一个结束的东西,8个字节是正经的,剩下的随意,然后返回文件满了的状态。

…\n//Initializationofstoragespace\n//初始化存储空间\nthis.resetByteBuffer(msgStoreItemMemory,msgLen);\n//1TOTALSIZE\nthis.msgStoreItemMemory.putInt(msgLen);\n//2MAGICCODE\nthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);\n//3BODYCRC\nthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());\n//4QUEUEID\nthis.msgStoreItemMemory.putInt(msgInner.getQueueId());\n//5FLAG\nthis.msgStoreItemMemory.putInt(msgInner.getFlag());\n//6QUEUEOFFSET\nthis.msgStoreItemMemory.putLong(queueOffset);\n//7PHYSICALOFFSET\nthis.msgStoreItemMemory.putLong(fileFromOffset+byteBuffer.position());\n//8SYSFLAG\nthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());\n//9BORNTIMESTAMP\nthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());\n//10BORNHOST\nthis.resetByteBuffer(bornHostHolder,bornHostLength);\nthis.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));\n//11STORETIMESTAMP\nthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());\n//12STOREHOSTADDRESS\nthis.resetByteBuffer(storeHostHolder,storeHostLength);\nthis.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));\n//13RECONSUMETIMES\nthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());\n//14PreparedTransactionOffset\nthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());\n//15BODY\nthis.msgStoreItemMemory.putInt(bodyLength);\nif(bodyLength>0)\nthis.msgStoreItemMemory.put(msgInner.getBody());\n//16TOPIC\nthis.msgStoreItemMemory.put((byte)topicLength);\nthis.msgStoreItemMemory.put(topicData);\n//17PROPERTIES\nthis.msgStoreItemMemory.putShort((short)propertiesLength);\nif(propertiesLength>0)\nthis.msgStoreItemMemory.put(propertiesData);\n\nfinallongbeginTimeMills=CommitLog.this.defaultMessageStore.now();\n//Writemessagestothequeuebuffer\nbyteBuffer.put(this.msgStoreItemMemory.array(),0,msgLen);\n…

这个就是封装消息了,最后将消息放到byteBuffer中。

…\n//创建AppendMessageResult\nAppendMessageResultresult=newAppendMessageResult(AppendMessageStatus.PUT_OK,wroteOffset,msgLen,msgId,\nmsgInner.getStoreTimestamp(),queueOffset,CommitLog.this.defaultMessageStore.now()-beginTimeMills);\n\nswitch(tranType){\ncaseMessageSysFlag.TRANSACTION_PREPARED_TYPE:\ncaseMessageSysFlag.TRANSACTION_ROLLBACK_TYPE:\nbreak;\ncaseMessageSysFlag.TRANSACTION_NOT_TYPE:\ncaseMessageSysFlag.TRANSACTION_COMMIT_TYPE:\n//ThenextupdateConsumeQueueinformation\n//更新消息队列的逻辑偏移量\nCommitLog.this.topicQueueTable.put(key,++queueOffset);\nbreak;\ndefault:\nbreak;\n}\nreturnresult;\n}

最后就是封装追加消息的结果是put_ok,然后更新queueoffset,其实就是+1。

接下来我们回过头来看下appendMessagesInner的后半部分,

…\nthis.wrotePosition.addAndGet(result.getWroteBytes());\nthis.storeTimestamp=result.getStoreTimestamp();\nreturnresult;

这里其实就是更新了一下这个MappedFile写到哪个地方了,更新了下写入时间。

回到commitLog的putMessage方法:

…\n//todo往mappedFile追加消息\nresult=mappedFile.appendMessage(msg,this.appendMessageCallback);\nswitch(result.getStatus()){\ncasePUT_OK:\nbreak;\ncaseEND_OF_FILE:\nunlockMappedFile=mappedFile;\n//Createanewfile,re-writethemessage\nmappedFile=this.mappedFileQueue.getLastMappedFile(0);\nif(null==mappedFile){\n//XXX:warnandnotifyme\nlog.error(&34;+msg.getTopic()+&34;+msg.getBornHostString());\nbeginTimeInLock=0;\nreturnCompletableFuture.completedFuture(newPutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED,result));\n}\nresult=mappedFile.appendMessage(msg,this.appendMessageCallback);\nbreak;\ncaseMESSAGE_SIZE_EXCEEDED:\ncasePROPERTIES_SIZE_EXCEEDED:\nbeginTimeInLock=0;\nreturnCompletableFuture.completedFuture(newPutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,result));\ncaseUNKNOWN_ERROR:\nbeginTimeInLock=0;\nreturnCompletableFuture.completedFuture(newPutMessageResult(PutMessageStatus.UNKNOWN_ERROR,result));\ndefault:\nbeginTimeInLock=0;\nreturnCompletableFuture.completedFuture(newPutMessageResult(PutMessageStatus.UNKNOWN_ERROR,result));\n}\n\nelapsedTimeInLock=this.defaultMessageStore.getSystemClock().now()-beginLockTimestamp;\nbeginTimeInLock=0;\n}finally{\nputMessageLock.unlock();\n}\n…

这里追加完成了,就需要判断追加状态了,如果是那种MappedFile放不开消息的情况,它会重新获取一个MappedFile,然后重新追加,在释放锁之前,它还会将beginTimeInLock这个字段重置为0;

…\nif(elapsedTimeInLock>500){\nlog.warn(&34;,elapsedTimeInLock,msg.getBody().length,result);\n}\n\nif(null!=unlockMappedFile&&this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()){\nthis.defaultMessageStore.unlockMappedFile(unlockMappedFile);\n}\n\nPutMessageResultputMessageResult=newPutMessageResult(PutMessageStatus.PUT_OK,result);\n\n//Statistics\nstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();\nstoreStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());\n\n//todo消息首先进入pagecache,然后执行刷盘操作,\nCompletableFuture<PutMessageStatus>flushResultFuture=submitFlushRequest(result,msg);\n//todo接着调用submitReplicaRequest方法将消息提交到HaService,进行数据复制\nCompletableFuture<PutMessageStatus>replicaResultFuture=submitReplicaRequest(result,msg);\n\n//todo这里使用了ComplateFuture的thenCombine方法,将刷盘、复制当成一\n//todo个联合任务执行,这里设置消息追加的最终状态\nreturnflushResultFuture.thenCombine(replicaResultFuture,(flushStatus,replicaStatus)->{\nif(flushStatus!=PutMessageStatus.PUT_OK){\nputMessageResult.setPutMessageStatus(flushStatus);\n}\nif(replicaStatus!=PutMessageStatus.PUT_OK){\nputMessageResult.setPutMessageStatus(replicaStatus);\nif(replicaStatus==PutMessageStatus.FLUSH_SLAVE_TIMEOUT){\nlog.error(&34;,\nmsg.getTopic(),msg.getTags(),msg.getBornHostNameString());\n}\n}\nreturnputMessageResult;\n});\n}

判断了一下耗时,如果是大于500ms的话,打印警告,封装put消息的结果,统计store,可以看到后面调用了2个方法,一个是刷盘的,一个是同步消息的,我们这里要看下这个刷盘动作:

publicCompletableFuture<PutMessageStatus>submitFlushRequest(AppendMessageResultresult,MessageExtmessageExt){\n//Synchronizationflush同步刷盘\nif(FlushDiskType.SYNC_FLUSH==this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()){\nfinalGroupCommitServiceservice=(GroupCommitService)this.flushCommitLogService;\nif(messageExt.isWaitStoreMsgOK()){\n//构建GroupCommitRequest同步任务并提交到GroupCommitRequest\nGroupCommitRequestrequest=newGroupCommitRequest(result.getWroteOffset()+result.getWroteBytes(),\nthis.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());\n//刷盘请求\nservice.putRequest(request);\nreturnrequest.future();\n}else{\nservice.wakeup();\nreturnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK);\n}\n}\n//Asynchronousflush异步刷盘这个就是靠os\nelse{\nif(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){\nflushCommitLogService.wakeup();\n}else{\ncommitLogService.wakeup();\n}\nreturnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK);\n}\n}

如果broker配置的SYNC_FLUSH并且是个同步消息,这个时候就会创建一个刷盘请求,然后提交刷盘请求,这个时候会等着刷盘完成,默认就是5s。

接着就是到存储器的putMessage方法的后半部分了:

…\n//todo存储消息\nCompletableFuture<PutMessageResult>putResultFuture=this.commitLog.asyncPutMessage(msg);\n\nputResultFuture.thenAccept((result)->{\nlongelapsedTime=this.getSystemClock().now()-beginTime;\nif(elapsedTime>500){\nlog.warn(&34;,elapsedTime,msg.getBody().length);\n}\n//记录状态\nthis.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);\n\nif(null==result||!result.isOk()){\n//记录状态\nthis.storeStatsService.getPutMessageFailedTimes().incrementAndGet();\n}\n});\n\nreturnputResultFuture;

commitlog存入消息之后,咱们这块也就算是完成了,最后就是回到那个processor,然后将put结果写入对应的channel给返回去,告诉消息生产者消息写入结果。消息存储其实就是找对应的MappedFile,按照一定的格式往文件里面写入,需要注意的是内存映射文件。

这里附一张消息存储字段存储顺序与字段长度的图:

4.总结

本文主要分析了broker接收producer消息的流程,流程如下:

处理消息接收的底层服务为netty,在BrokerControllerprocessRequest来处理消息接收消息接收流程的最后,MappedFile#appendMessage(…)方法会将消息内容写入到commitLog文件中。

关于接码网站源码分享到此分享完毕,希望能帮助到您。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平