大家好,个人服务器的网站源码分享相信很多的网友都不是很明白,包括个人网站服务器配置也是一样,不过没有关系,接下来就来为大家分享关于个人服务器的网站源码分享和个人网站服务器配置的一些知识点,大家可以关注收藏,免得下次来找不到哦,下面我们开始吧!
1.处理业务请求的ChannelHandler:serverHandler
NamesrvController启动后,就可以处理Broker/Producer/Consumer的请求消息了,处理该类型消息的ChannelHandler为serverHandler,也就是NettyRemotingServer.NettyServerHandler(NettyServerHandler是NettyRemotingServer的内部类),代码如下:
classNettyServerHandlerextendsSimpleChannelInboundHandler<RemotingCommand>{\n\n@Override\nprotectedvoidchannelRead0(ChannelHandlerContextctx,RemotingCommandmsg)\nthrowsException{\nprocessMessageReceived(ctx,msg);\n}\n}\n复制代码
继续跟进NettyRemotingAbstractprocessRequestCommand:
publicvoidprocessRequestCommand(finalChannelHandlerContextctx,finalRemotingCommandcmd){\n//根据code从processorTable获取Pair\nfinalPair<NettyRequestProcessor,ExecutorService>matched=this.processorTable.get(cmd.getCode());\n//找不到默认值\nfinalPair<NettyRequestProcessor,ExecutorService>pair=null==matched?this.defaultRequestProcessor:matched;\nfinalintopaque=cmd.getOpaque();\n\nif(pair!=null){\nRunnablerun=newRunnable(){\n@Override\npublicvoidrun(){\ntry{\ndoBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd);\nfinalRemotingResponseCallbackcallback=newRemotingResponseCallback(){\n@Override\npublicvoidcallback(RemotingCommandresponse){\ndoAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd,response);\n//不是单向\nif(!cmd.isOnewayRPC()){\nif(response!=null){\nresponse.setOpaque(opaque);\nresponse.markResponseType();\ntry{\nctx.writeAndFlush(response);\n}catch(Throwablee){\nlog.error(&34;,e);\nlog.error(cmd.toString());\nlog.error(response.toString());\n}\n}else{\n}\n}\n}\n};\n//异步netty请求处理器\nif(pair.getObject1()instanceofAsyncNettyRequestProcessor){\nAsyncNettyRequestProcessorprocessor=(AsyncNettyRequestProcessor)pair.getObject1();\nprocessor.asyncProcessRequest(ctx,cmd,callback);\n}else{\n//不是异步请求处理器从pair中拿到Processor进行处理\nNettyRequestProcessorprocessor=pair.getObject1();\n//todo处理请求\nRemotingCommandresponse=processor.processRequest(ctx,cmd);\ncallback.callback(response);\n}\n}\n…\n}\n};\n\nif(pair.getObject1().rejectRequest()){\nfinalRemotingCommandresponse=RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,\n&34;);\nresponse.setOpaque(opaque);\nctx.writeAndFlush(response);\nreturn;\n}\n\ntry{\nfinalRequestTaskrequestTask=newRequestTask(run,ctx.channel(),cmd);\npair.getObject2().submit(requestTask);\n}catch(RejectedExecutionExceptione){\n…\n}\n}else{\n…\n}\n}\n复制代码
这个方法主要流程为,先获取Pair对象,然后将处理操作封装为Runnable对象,接着把Runnable对象提交到线程池中。
这个Pair对象是啥呢?还记得我们在NamesrvControllerrun方法中:
publicvoidrun(){\ntry{\ndoBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),cmd);\n…\n//处理请求\nif(pair.getObject1()instanceofAsyncNettyRequestProcessor){\nAsyncNettyRequestProcessorprocessor=\n(AsyncNettyRequestProcessor)pair.getObject1();\nprocessor.asyncProcessRequest(ctx,cmd,callback);\n}else{\nNettyRequestProcessorprocessor=pair.getObject1();\n//处理请求\nRemotingCommandresponse=processor.processRequest(ctx,cmd);\ncallback.callback(response);\n}\n}catch(Throwablee){\n…\n}\n}\n复制代码
代码中区分了同步与异步请求两种方式,实际上最终都会进入到DefaultRequestProcessorregisterBrokerWithFilterServer,我们直接看重要代码:
publicRemotingCommandregisterBrokerWithFilterServer(ChannelHandlerContextctx,\nRemotingCommandrequest)throwsRemotingCommandException{\n…\n\n//处理注册\nRegisterBrokerResultresult=this.namesrvController.getRouteInfoManager().registerBroker(\nrequestHeader.getClusterName(),\nrequestHeader.getBrokerAddr(),\nrequestHeader.getBrokerName(),\nrequestHeader.getBrokerId(),\nrequestHeader.getHaServerAddr(),\nregisterBrokerBody.getTopicConfigSerializeWrapper(),\nregisterBrokerBody.getFilterServerList(),\nctx.channel());\n…\n}\n复制代码
这里我们去除了不必要的代码,仅保留了注册的方法,这里调用的是RouteInfoManagerregisterBroker方法,我们就会发现所谓的注册就是往以上几个HashMap中put数据的操作:
publicRegisterBrokerResultregisterBroker(\nfinalStringclusterName,\nfinalStringbrokerAddr,\nfinalStringbrokerName,\nfinallongbrokerId,\nfinalStringhaServerAddr,\nfinalTopicConfigSerializeWrappertopicConfigWrapper,\nfinalList<String>filterServerList,\nfinalChannelchannel){\nRegisterBrokerResultresult=newRegisterBrokerResult();\ntry{\ntry{\n//第一步路由注册加写锁\nthis.lock.writeLock().lockInterruptibly();\n\n//判断broker所属集群是否存在\nSet<String>brokerNames=this.clusterAddrTable.get(clusterName);\nif(null==brokerNames){\nbrokerNames=newHashSet<String>();\nthis.clusterAddrTable.put(clusterName,brokerNames);\n}\nbrokerNames.add(brokerName);\n\nbooleanregisterFirst=false;\n\n//第二步,维护brokerData信息\nBrokerDatabrokerData=this.brokerAddrTable.get(brokerName);\nif(null==brokerData){\nregisterFirst=true;\nbrokerData=newBrokerData(clusterName,brokerName,newHashMap<Long,String>());\nthis.brokerAddrTable.put(brokerName,brokerData);\n}\nMap<Long,String>brokerAddrsMap=brokerData.getBrokerAddrs();\n//Switchslavetomaster:firstremove<1,IP:PORT>innamesrv,thenadd<0,IP:PORT>\n//ThesameIP:PORTmustonlyhaveonerecordinbrokerAddrTable\nIterator<Entry<Long,String>>it=brokerAddrsMap.entrySet().iterator();\nwhile(it.hasNext()){\nEntry<Long,String>item=it.next();\nif(null!=brokerAddr&&brokerAddr.equals(item.getValue())&&brokerId!=item.getKey()){\nit.remove();\n}\n}\n\nStringoldAddr=brokerData.getBrokerAddrs().put(brokerId,brokerAddr);\nregisterFirst=registerFirst||(null==oldAddr);\n\n//第三步创建或更新topic路由元数据\n//broker是主节点\nif(null!=topicConfigWrapper\n&&MixAll.MASTER_ID==brokerId){\n//topic配置信息发生变化或初次注册\nif(this.isBrokerTopicConfigChanged(brokerAddr,topicConfigWrapper.getDataVersion())\n||registerFirst){\nConcurrentMap<String,TopicConfig>tcTable=\ntopicConfigWrapper.getTopicConfigTable();\nif(tcTable!=null){\nfor(Map.Entry<String,TopicConfig>entry:tcTable.entrySet()){\n//todo创建或更新topic路由元数据,并填充topicQueueTable\nthis.createAndUpdateQueueData(brokerName,entry.getValue());\n}\n}\n}\n}\n\n//第四步更新brokerLiveTable\nBrokerLiveInfoprevBrokerLiveInfo=this.brokerLiveTable.put(brokerAddr,\nnewBrokerLiveInfo(\nSystem.currentTimeMillis(),\ntopicConfigWrapper.getDataVersion(),\nchannel,\nhaServerAddr));\nif(null==prevBrokerLiveInfo){\nlog.info(&34;,brokerAddr,haServerAddr);\n}\n\n//第5,注册broker的过滤器Server地址列表\nif(filterServerList!=null){\nif(filterServerList.isEmpty()){\nthis.filterServerTable.remove(brokerAddr);\n}else{\nthis.filterServerTable.put(brokerAddr,filterServerList);\n}\n}\n\nif(MixAll.MASTER_ID!=brokerId){\nStringmasterAddr=brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);\nif(masterAddr!=null){\nBrokerLiveInfobrokerLiveInfo=this.brokerLiveTable.get(masterAddr);\nif(brokerLiveInfo!=null){\nresult.setHaServerAddr(brokerLiveInfo.getHaServerAddr());\nresult.setMasterAddr(masterAddr);\n}\n}\n}\n}finally{\n//释放锁\nthis.lock.writeLock().unlock();\n}\n}catch(Exceptione){\nlog.error(&34;,e);\n}\n\nreturnresult;\n}\n复制代码
这样一来,这个方法所做的工作就一目了然了,就是把broker上报的信息包装下,然后放到这几个hashMap中。
了解完成注册操作后,注销操作就不难理解了,它是跟注册相反的操作,所做的事就是从这几个hashMap中移除broker对应的信息,处理方法为RouteInfoManagergetRouteInfoByTopic
publicRemotingCommandgetRouteInfoByTopic(ChannelHandlerContextctx,\nRemotingCommandrequest)throwsRemotingCommandException{\n//创建response\nfinalRemotingCommandresponse=RemotingCommand.createResponseCommand(null);\n//获取请求头\nfinalGetRouteInfoRequestHeaderrequestHeader=\n(GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);\n\n//todo获取某个topic的路由信息\nTopicRouteDatatopicRouteData=this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());\n\n//topicRouteData不为null\nif(topicRouteData!=null){\n//是否支持顺序消费默认false\nif(this.namesrvController.getNamesrvConfig().isOrderMessageEnable()){\nStringorderTopicConf=\nthis.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,\nrequestHeader.getTopic());\ntopicRouteData.setOrderTopicConf(orderTopicConf);\n}\n\n//组装响应并返回\n//序列化json\nbyte[]content=topicRouteData.encode();\nresponse.setBody(content);\nresponse.setCode(ResponseCode.SUCCESS);\nresponse.setRemark(null);\nreturnresponse;\n}\n\n//如果不存在返回topic不存在code\nresponse.setCode(ResponseCode.TOPIC_NOT_EXIST);\nresponse.setRemark(&34;+requestHeader.getTopic()\n+FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));\nreturnresponse;\n}\n复制代码
这个方法里调用了RouteInfoManager34;pickupTopicRouteDataException&34;pickupTopicRouteData{}{}&queryBrokerTopicConfig方法,代码如下:
publicRemotingCommandqueryBrokerTopicConfig(ChannelHandlerContextctx,\nRemotingCommandrequest)throwsRemotingCommandException{\nfinalRemotingCommandresponse=RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);\nfinalQueryDataVersionResponseHeaderresponseHeader=(QueryDataVersionResponseHeader)response.readCustomHeader();\nfinalQueryDataVersionRequestHeaderrequestHeader=\n(QueryDataVersionRequestHeader)request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);\nDataVersiondataVersion=DataVersion.decode(request.getBody(),DataVersion.class);\n\n//todo关键代码:判断版本是否发生变化\nBooleanchanged=this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(),dataVersion);\nif(!changed){\n//如果没改变,就更新最后一次的上报时间为当前时间\nthis.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());\n}\n\nDataVersionnameSeverDataVersion=this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());\nresponse.setCode(ResponseCode.SUCCESS);\nresponse.setRemark(null);\n\n//返回nameServer当前的版本号\nif(nameSeverDataVersion!=null){\nresponse.setBody(nameSeverDataVersion.encode());\n}\nresponseHeader.setChanged(changed);\nreturnresponse;\n}\n复制代码
这个方法主要包含3个操作:
4.1判断broker版本是否发生变化
这部分的判断,就是判断上报的版本号与NameServer保存的版本号是否一致:
publicbooleanisBrokerTopicConfigChanged(finalStringbrokerAddr,\nfinalDataVersiondataVersion){\n//继续查询\nDataVersionprev=queryBrokerTopicConfig(brokerAddr);\nreturnnull==prev||!prev.equals(dataVersion);\n}\n复制代码
这个方法就是判断逻辑了,只是一个简单的equals操作,继续看DataVersion的查询:
publicDataVersionqueryBrokerTopicConfig(finalStringbrokerAddr){\nBrokerLiveInfoprev=this.brokerLiveTable.get(brokerAddr);\nif(prev!=null){\nreturnprev.getDataVersion();\n}\nreturnnull;\n}\n复制代码
又是对RouteInfoManager那几个hashMap的操作,最终是从brokerLiveTable获取到了NameServer保存的版本号。
4.2版本没有发生变化时的操作
如果版本没有发生变化,就更新当前时间为最新上报时间,这个流程没法啥好说的,直接上代码:
publicvoidupdateBrokerInfoUpdateTimestamp(finalStringbrokerAddr){\nBrokerLiveInfoprev=this.brokerLiveTable.get(brokerAddr);\nif(prev!=null){\nprev.setLastUpdateTimestamp(System.currentTimeMillis());\n}\n}\n\n复制代码
又是对RouteInfoManager那几个hashMap的操作,这里需要注意的是,当DataVersion没有发生变化,才会更新BrokerLiveInfolastUpdateTimestamp的值了吗?并不是,如果DataVersion发生了变化,就表明broker需要再次注册,BrokerLiveInfoqueryBrokerTopicConfig,在上面的1.判断broker版本是否发生变化中就用过了,这里就不再赘述了。
5.定时任务:检测broker是否存活
在前面分析NamesrvControllerscanNotActiveBroker,代码如下:
publicvoidscanNotActiveBroker(){\n//brokerLiveTable:存放活跃的broker,就是找出其中不活跃的,然后移除,操作的是brokerLiveTable\nIterator<Entry<String,BrokerLiveInfo>>it=this.brokerLiveTable.entrySet().iterator();\nwhile(it.hasNext()){\nEntry<String,BrokerLiveInfo>next=it.next();\n//上一次的心跳时间\nlonglast=next.getValue().getLastUpdateTimestamp();\n//根据心跳时间判断是否存活,超时时间为2min\nif((last+BROKER_CHANNEL_EXPIRED_TIME)<System.currentTimeMillis()){\nRemotingUtil.closeChannel(next.getValue().getChannel());\n//移除\nit.remove();\n//处理channel的关闭,这个方法里会处理其他hashMap的移除\nthis.onChannelDestroy(next.getKey(),next.getValue().getChannel());\n}\n}\n}\n复制代码
这个方法先是遍历brokerLiveTable,然后判断每个BrokerLiveInfo的最近一次的上报时间,判断是否超时,如果最近的上报时间距离当前超过了2分钟,说明该broker可能挂了,就将它从brokerLiveTable移除,然后调用RouteInfoManagerprocessRequest,这个方法会处理众多的请求,我们重点分析了注册/注销broker消息、获取topic路由消息、获取broker版本信息的处理流程。
注册/注销broker消息、获取topic路由消息、获取broker版本信息最终都是在RouteInfoManager类中处理,这个类中有几个非常重要的、类型为HashMap的成员变量如下:
topicQueueTable:存放保存topic与Queue的关系,value类型为List,表明一个topic可以有多个queuebrokerAddrTable:记录broker的具体信息,key为broker名称,value为broker具体信息clusterAddrTable:集群信息,保存集群名称对应的brokerNamebrokerLiveTable:存活的broker信息,key为broker地址,value为具体的broker服务器
这个几成员变量就是NameServer被称为注册中心的原因所在,所谓的注册/注销broker,就是往这几个hashMap中put或remove相关的broker信息;获取topic路由消息就是从topicQueueTable中获取broker/messageQueue等信息。
而nameServer所谓的&34;、“发现”、“心跳”等,都是对RouteInfoManager这几个hashMap成员变量进行操作的。
好了,本文就到这里了,下篇开始我们将进入product的分析
个人服务器的网站源码分享的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于个人网站服务器配置、个人服务器的网站源码分享的信息别忘了在本站进行查找哦。
