包裹网站源码分享(包裹网站源码分享怎么弄)

大家好,关于包裹网站源码分享很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于包裹网站源码分享怎么弄的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!

本系列Netty源码解析文章基于4.1.56.Final版本

主从Reactor组完整结构.png

在?《Netty如何高效接收网络数据》一文中,我们介绍了Netty的SubReactor处理网络数据读取的完整过程,当Netty为我们读取了网络请求数据,并且我们在自己的业务线程中完成了业务处理后,就需要将业务处理结果返回给客户端了,那么本文我们就来介绍下SubReactor如何处理网络数据发送的整个过程。

我们都知道Netty是一款高性能的异步事件驱动的网络通讯框架,既然是网络通讯框架那么它主要做的事情就是:

接收客户端连接。读取连接上的网络请求数据。向连接发送网络响应数据。

前边系列文章在介绍?Netty的启动以及?接收连接的过程中,我们只看到OP_ACCEPT事件以及OP_READ事件的注册,并未看到OP_WRITE事件的注册。

那么在什么情况下Netty才会向SubReactor去注册OP_WRITE事件呢?Netty又是怎么对写操作做到异步处理的呢?

本文笔者将会为大家一一揭晓这些谜底。我们还是以之前的EchoServer为例进行说明。

@Sharable\npublicclassEchoServerHandlerextendsChannelInboundHandlerAdapter{\n\n@Override\npublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){\n//此处的msg就是Netty在readloop中从NioSocketChannel中读取到的ByteBuffer\nctx.write(msg);\n}\n\n}\n

我们将在?《Netty如何高效接收网络数据》一文中读取到的ByteBuffer(这里的Objectmsg),直接发送回给客户端,用这个简单的例子来揭开Netty如何发送数据的序幕~~

在实际开发中,我们首先要通过解码器将读取到的ByteBuffer解码转换为我们的业务Request类,然后在业务线程中做业务处理,在通过编码器对业务Response类编码为ByteBuffer,最后利用ChannelHandlerContextctx的引用发送响应数据。

本文我们只聚焦Netty写数据的过程,对于Netty编解码相关的内容,笔者会在后续的文章中专门介绍。

本文概要.png

1.ChannelHandlerContext

pipeline结构.png

通过前面几篇文章的介绍,我们知道Netty会为每个Channel分配一个pipeline,pipeline是一个双向链表的结构。Netty中产生的IO异步事件会在这个pipeline中传播。

Netty中的IO异步事件大体上分为两类:

inbound事件:入站事件,比如前边介绍的ChannelActive事件,ChannelRead事件,它们会从pipeline的头结点HeadContext开始一直向后传播。outbound事件:出站事件,比如本文中即将要介绍到的write事件以及flush事件,出站事件会从相反的方向从后往前传播直到HeadContext。最终会在HeadContext中完成出站事件的处理。本例中用到的channelHandlerContext.write()会使write事件从当前ChannelHandler也就是这里的EchoServerHandler开始沿着pipeline向前传播。而channelHandlerContext.channel().write()则会使write事件从pipeline的尾结点TailContext开始向前传播直到HeadContext。

客户端channelpipeline结构.png

而pipeline这样一个双向链表数据结构中的类型正是ChannelHandlerContext,由ChannelHandlerContext包裹我们自定义的IO处理逻辑ChannelHandler。

ChannelHandler并不需要感知到它所处的pipeline中的上下文信息,只需要专心处理好IO逻辑即可,关于pipeline的上下文信息全部封装在ChannelHandlerContext中。

ChannelHandler在Netty中的作用只是负责处理IO逻辑,比如编码,解码。它并不会感知到它在pipeline中的位置,更不会感知和它相邻的两个ChannelHandler。事实上ChannelHandler也并不需要去关心这些,它唯一需要关注的就是处理所关心的异步事件

而ChannelHandlerContext中维护了pipeline这个双向链表中的pre以及next指针,这样可以方便的找到与其相邻的ChannelHandler,并可以过滤出一些符合执行条件的ChannelHandler。正如它的命名一样,ChannelHandlerContext正是起到了维护ChannelHandler上下文的一个作用。而Netty中的异步事件在pipeline中的传播靠的就是这个ChannelHandlerContext。

这样设计就使得ChannelHandlerContext和ChannelHandler的职责单一,各司其职,具有高度的可扩展性。

2.write事件的传播

我们无论是在业务线程或者是在SubReactor线程中完成业务处理后,都需要通过channelHandlerContext的引用将write事件在pipeline中进行传播。然后在pipeline中相应的ChannelHandler中监听write事件从而可以对write事件进行自定义编排处理(比如我们常用的编码器),最终传播到HeadContext中执行发送数据的逻辑操作。

前边也提到Netty中有两个触发write事件传播的方法,它们的传播处理逻辑都是一样的,只不过它们在pipeline中的传播起点是不同的。

channelHandlerContext.write()方法会从当前ChannelHandler开始在pipeline中向前传播write事件直到HeadContext。channelHandlerContext.channel().write()方法则会从pipeline的尾结点TailContext开始在pipeline中向前传播write事件直到HeadContext。

客户端channelpipeline结构.png

在我们清楚了write事件的总体传播流程后,接下来就来看看在write事件传播的过程中Netty为我们作了些什么?这里我们以channelHandlerContext.write()方法为例说明。

3.write方法发送数据

write事件传播流程.png

abstractclassAbstractChannelHandlerContextimplementsChannelHandlerContext,ResourceLeakHint{\n\n@Override\npublicChannelFuturewrite(Objectmsg){\nreturnwrite(msg,newPromise());\n}\n\n@Override\npublicChannelFuturewrite(finalObjectmsg,finalChannelPromisepromise){\nwrite(msg,false,promise);\nreturnpromise;\n}\n\n}\n

这里我们看到Netty的写操作是一个异步操作,当我们在业务线程中调用channelHandlerContext.write()后,Netty会给我们返回一个ChannelFuture,我们可以在这个ChannelFutrue中添加ChannelFutureListener,这样当Netty将我们要发送的数据发送到底层Socket中时,Netty会通过ChannelFutureListener通知我们写入结果。

@Override\npublicvoidchannelRead(finalChannelHandlerContextctx,finalObjectmsg){\n//此处的msg就是Netty在readloop中从NioSocketChannel中读取到的ByteBuffer\nChannelFuturefuture=ctx.write(msg);\nfuture.addListener(newChannelFutureListener(){\n@Override\npublicvoidoperationComplete(ChannelFuturefuture)throwsException{\nThrowablecause=future.cause();\nif(cause!=null){\n处理异常情况\n}else{\n写入Socket成功后,Netty会通知到这里\n}\n}\n});\n}\n

当异步事件在pipeline传播的过程中发生异常时,异步事件就会停止在pipeline中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。

其中inbound类异步事件发生异常时,会触发exceptionCaught事件传播。exceptionCaught事件本身也是一种inbound事件,传播方向会从当前发生异常的ChannelHandler开始一直向后传播直到TailContext。而outbound类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关ChannelFuture。但如果是flush事件在传播过程中发生异常,则会触发当前发生异常的ChannelHandler中exceptionCaught事件回调。

我们继续回归到写操作的主线上来~~~

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){\nObjectUtil.checkNotNull(msg,&34;);\n\n…………….省略检查promise的有效性……………\n\n//flush=true表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler\n//flush=false表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler\nfinalAbstractChannelHandlerContextnext=findContextOutbound(flush?\n(MASK_WRITE|MASK_FLUSH):MASK_WRITE);\n//用于检查内存泄露\nfinalObjectm=pipeline.touch(msg,next);\n//获取pipeline中下一个要被执行的channelHandler的executor\nEventExecutorexecutor=next.executor();\n//确保OutBound事件由ChannelHandler指定的executor执行\nif(executor.inEventLoop()){\n//如果当前线程正是channelHandler指定的executor则直接执行\nif(flush){\nnext.invokeWriteAndFlush(m,promise);\n}else{\nnext.invokeWrite(m,promise);\n}\n}else{\n//如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。\nfinalWriteTasktask=WriteTask.newInstance(next,m,promise,flush);\nif(!safeExecute(executor,task,promise,m,!flush)){\ntask.cancel();\n}\n}\n}\n

write事件要向前在pipeline中传播,就需要在pipeline上找到下一个具有执行资格的ChannelHandler,因为位于当前ChannelHandler前边的可能是ChannelInboundHandler类型的也可能是ChannelOutboundHandler类型的ChannelHandler,或者有可能压根就不关心write事件的ChannelHandler(没有实现write回调方法)。

write事件的传播.png

这里我们就需要通过findContextOutbound方法在当前ChannelHandler的前边找到ChannelOutboundHandler类型并且覆盖实现write回调方法的ChannelHandler作为下一个要执行的对象。

3.1findContextOutbound

privateAbstractChannelHandlerContextfindContextOutbound(intmask){\nAbstractChannelHandlerContextctx=this;\n//获取当前ChannelHandler的executor\nEventExecutorcurrentExecutor=executor();\ndo{\n//获取前一个ChannelHandler\nctx=ctx.prev;\n}while(skipContext(ctx,currentExecutor,mask,MASK_ONLY_OUTBOUND));\nreturnctx;\n}\n//判断前一个ChannelHandler是否具有响应Write事件的资格\nprivatestaticbooleanskipContext(\nAbstractChannelHandlerContextctx,EventExecutorcurrentExecutor,intmask,intonlyMask){\n\nreturn(ctx.executionMask&(onlyMask|mask))==0||\n(ctx.executor()==currentExecutor&&(ctx.executionMask&mask)==0);\n}\n

findContextOutbound方法接收的参数是一个掩码,这个掩码表示要向前查找具有什么样执行资格的ChannelHandler。因为我们这里调用的是ChannelHandlerContext的write方法所以flush=false,传递进来的掩码为MASK_WRITE,表示我们要向前查找覆盖实现了write回调方法的ChannelOutboundHandler。

3.1.1掩码的巧妙应用

Netty中将ChannelHandler覆盖实现的一些异步事件回调方法用int型的掩码来表示,这样我们就可以通过这个掩码来判断当前ChannelHandler具有什么样的执行资格。

finalclassChannelHandlerMask{\n………………..省略………………….\n\nstaticfinalintMASK_CHANNEL_ACTIVE=1<<3;\nstaticfinalintMASK_CHANNEL_READ=1<<5;\nstaticfinalintMASK_CHANNEL_READ_COMPLETE=1<<6;\nstaticfinalintMASK_WRITE=1<<15;\nstaticfinalintMASK_FLUSH=1<<16;\n\n//outbound事件掩码集合\nstaticfinalintMASK_ONLY_OUTBOUND=MASK_BIND|MASK_CONNECT|MASK_DISCONNECT|\nMASK_CLOSE|MASK_DEREGISTER|MASK_READ|MASK_WRITE|MASK_FLUSH;\n………………..省略………………….\n}\n

在ChannelHandler被添加进pipeline的时候,Netty会根据当前ChannelHandler的类型以及其覆盖实现的异步事件回调方法,通过|运算向ChannelHandlerContextaddLast(EventExecutorGroup,ChannelHandler……)方法指定执行该ChannelHandler的executor。如果不特殊指定,那么执行该ChannelHandler的executor默认为该Channel绑定的Reactor线程。

执行ChannelHandler中异步事件回调方法的线程必须是ChannelHandler指定的executor。

所以这里首先我们需要获取在findContextOutbound方法查找出来的下一个符合执行条件的ChannelHandler指定的executor。

EventExecutorexecutor=next.executor()\n

并通过executor.inEventLoop()方法判断当前线程是否是该ChannelHandler指定的executor。

如果是,那么我们直接在当前线程中执行ChannelHandler中的write方法。

如果不是,我们就需要将ChannelHandler对write事件的回调操作封装成异步任务WriteTask并提交给ChannelHandler指定的executor中,由executor负责执行。

这里需要注意的是这个executor并不一定是channel绑定的reactor线程。它可以是我们自定义的线程池,不过需要我们通过ChannelPipelinewrite方法,所以这里的flush=false。触发调用nextChannelHandler的write方法。

voidinvokeWrite(Objectmsg,ChannelPromisepromise){\nif(invokeHandler()){\ninvokeWrite0(msg,promise);\n}else{\n//当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded\n//所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件\nwrite(msg,promise);\n}\n}\n

这里首先需要通过invokeHandler()方法判断这个nextChannelHandler中的handlerAdded方法是否被回调过。因为ChannelHandler只有被正确的添加到对应的ChannelHandlerContext中并且准备好处理异步事件时,ChannelHandlerwrite方法继续向前传播write事件。

@Override\npublicChannelFuturewrite(finalObjectmsg,finalChannelPromisepromise){\n//继续向前传播write事件,回到流程起点\nwrite(msg,false,promise);\nreturnpromise;\n}\n

如果invokeHandler()返回true,说明这个nextChannelHandler已经在pipeline中被正确的初始化了,Netty直接调用这个ChannelHandler的write方法,这样就实现了write事件从当前ChannelHandler传播到了nextChannelHandler。

privatevoidinvokeWrite0(Objectmsg,ChannelPromisepromise){\ntry{\n//调用当前ChannelHandler中的write方法\n((ChannelOutboundHandler)handler()).write(this,msg,promise);\n}catch(Throwablet){\nnotifyOutboundHandlerException(t,promise);\n}\n}\n

这里我们看到在write事件的传播过程中如果发生异常,那么write事件就会停止在pipeline中传播,并通知注册的ChannelFutureListener。

客户端channelpipeline结构.png

从本文示例的pipeline结构中我们可以看到,当在EchoServerHandler调用ChannelHandlerContext34;unsupportedmessagetype:&34;estimatorHandle&HandleImpl。

publicfinalclassDefaultMessageSizeEstimatorimplementsMessageSizeEstimator{\n\nprivatestaticfinalclassHandleImplimplementsHandle{\nprivatefinalintunknownSize;\n\nprivateHandleImpl(intunknownSize){\nthis.unknownSize=unknownSize;\n}\n\n@Override\npublicintsize(Objectmsg){\nif(msginstanceofByteBuf){\nreturn((ByteBuf)msg).readableBytes();\n}\nif(msginstanceofByteBufHolder){\nreturn((ByteBufHolder)msg).content().readableBytes();\n}\nif(msginstanceofFileRegion){\nreturn0;\n}\nreturnunknownSize;\n}\n}\n

这里我们看到ByteBuffer的大小即为Buffer中未读取的字节数writerIndex-readerIndex。

当我们验证了待写入数据msg的类型以及计算了msg的大小后,我们就可以通过ChannelOutboundBufferwrite方法中返回给用户的ChannelPromise。这样可以在数据写入Socket之后异步通知应用程序。

此外ChannelOutboundBuffer中还封装了三个重要的指针:

unflushedEntry:该指针指向ChannelOutboundBuffer中第一个待发送数据的Entry。tailEntry:该指针指向ChannelOutboundBuffer中最后一个待发送数据的Entry。通过unflushedEntry和tailEntry这两个指针,我们可以很方便的定位到待发送数据的Entry范围。flushedEntry:当我们通过flush操作需要将ChannelOutboundBuffer中缓存的待发送数据发送到Socket中时,flushedEntry指针会指向unflushedEntry的位置,这样flushedEntry指针和tailEntry指针之间的Entry就是我们即将发送到Socket中的网络数据。

这三个指针在初始化的时候均为null。

ChannelOutboundBuffer结构.png

3.3.1Entry

Entry作为ChannelOutboundBuffer链表结构中的节点元素类型,里边封装了待发送数据的各种信息,ChannelOutboundBuffer其实就是对Entry结构的组织和操作。因此理解Entry结构是理解整个ChannelOutboundBuffer运作流程的基础。

下面我们就来看下Entry结构具体封装了哪些待发送数据的信息。

staticfinalclassEntry{\n//Entry的对象池,用来创建和回收Entry对象\nprivatestaticfinalObjectPool<Entry>RECYCLER=ObjectPool.newPool(newObjectCreator<Entry>(){\n@Override\npublicEntrynewObject(Handle<Entry>handle){\nreturnnewEntry(handle);\n}\n});\n\n//DefaultHandle用于回收对象\nprivatefinalHandle<Entry>handle;\n//ChannelOutboundBuffer下一个节点\nEntrynext;\n//待发送数据\nObjectmsg;\n//msg转换为jdknio中的byteBuffer\nByteBuffer[]bufs;\nByteBufferbuf;\n//异步write操作的future\nChannelPromisepromise;\n//已发送了多少\nlongprogress;\n//总共需要发送多少,不包含entry对象大小。\nlongtotal;\n//pendingSize表示entry对象在堆中需要的内存总量待发送数据大小+entry对象本身在堆中占用内存大小(96)\nintpendingSize;\n//msg中包含了几个jdkniobytebuffer\nintcount=-1;\n//write操作是否被取消\nbooleancancelled;\n}\n

我们看到Entry结构中一共有12个字段,其中1个静态字段和11个实例字段。

下面笔者就为大家介绍下这12个字段的含义及其作用,其中有些字段会在后面的场景中使用到,这里大家可能对有些字段理解起来比较模糊,不过没关系,这里能看懂多少是多少,不理解也没关系,这里介绍只是为了让大家混个眼熟,在后面流程的讲解中,笔者还会重新提到这些字段。

ObjectPool<Entry>RECYCLER:Entry的对象池,负责创建管理Entry实例,由于Netty是一个网络框架,所以IO读写就成了它的核心操作,在一个支持高性能高吞吐的网络框架中,会有大量的IO读写操作,那么就会导致频繁的创建Entry对象。我们都知道,创建一个实例对象以及GC回收这些实例对象都是需要性能开销的,那么在大量频繁创建Entry对象的场景下,引入对象池来复用创建好的Entry对象实例可以抵消掉由频繁创建对象以及GC回收对象所带来的性能开销。

关于对象池的详细内容,感兴趣的同学可以回看下笔者的这篇文章?《详解Recycler对象池的精妙设计与实现》

Handle<Entry>handle:默认实现类型为DefaultHandle,用于数据发送完毕后,对象池回收Entry对象。由对象池RECYCLER在创建Entry对象的时候传递进来。Entrynext:ChannelOutboundBuffer是一个单链表的结构,这里的next指针用于指向当前Entry节点的后继节点。Objectmsg:应用程序待发送的网络数据,这里msg的类型为DirectByteBuffer或者FileRegion(用于通过零拷贝的方式网络传输文件)。ByteBuffer[]bufs:这里的ByteBuffer类型为JDKNIO原生的ByteBuffer类型,因为Netty最终发送数据是通过JDKNIO底层的SocketChannel进行发送,所以需要将Netty中实现的ByteBuffer类型转换为JDKNIOByteBuffer类型。应用程序发送的ByteBuffer可能是一个也可能是多个,如果发送多个就用ByteBuffer[]bufs封装在Entry对象中,如果是一个就用ByteBufferbuf封装。intcount:表示待发送数据msg中一共包含了多少个ByteBuffer需要发送。ChannelPromisepromise:ChannelHandlerContext34;io.netty.transport.outboundBufferEntrySizeOverhead&34;totalPendingSize&34;unwritable&addLast(EventExecutorGroupgroup,ChannelHandler…handlers)为ChannelHandler指定的executor。如果不指定,默认的executor为channel绑定的reactor线程。

如果当前线程不是ChannelHandler指定的executor,则需要将invokeFlush()方法的调用封装成Task交给指定的executor执行。

4.1.1触发nextChannelHandler的flush方法回调

privatevoidinvokeFlush(){\nif(invokeHandler()){\ninvokeFlush0();\n}else{\n//如果该ChannelHandler并没有加入到pipeline中则继续向前传递flush事件\nflush();\n}\n}\n

这里和write事件的相关处理一样,首先也是需要调用invokeHandler()方法来判断这个nextChannelHandler是否在pipeline中被正确的初始化。

如果nextChannelHandler中的handlerAdded方法并没有被回调过,那么这里就只能跳过nextChannelHandler,并调用ChannelHandlerContextaddFlush

ChannelOutboundBuffer结构.png

这里就到了真正要发送数据的时候了,在addFlush方法中会将flushedEntry指针指向unflushedEntry指针表示的第一个未被flush的Entry节点。并将unflushedEntry指针置为空,准备开始flush发送数据流程。

此时ChannelOutboundBuffer由待发送数据的缓冲队列变为了即将要flush进Socket的数据队列

这样在flushedEntry与tailEntry之间的Entry节点即为本次flush操作需要发送的数据范围。

publicvoidaddFlush(){\nEntryentry=unflushedEntry;\nif(entry!=null){\nif(flushedEntry==null){\nflushedEntry=entry;\n}\ndo{\nflushed++;\n//如果当前entry对应的write操作被用户取消,则释放msg,并降低channelOutboundBuffer水位线\nif(!entry.promise.setUncancellable()){\nintpending=entry.cancel();\ndecrementPendingOutboundBytes(pending,false,true);\n}\nentry=entry.next;\n}while(entry!=null);\n\n//AllflushedsoresetunflushedEntry\nunflushedEntry=null;\n}\n}\n

在flush发送数据流程开始时,数据的发送流程就不能被取消了,在这之前我们都是可以通过ChannelPromise取消数据发送流程的。

所以这里需要对ChannelOutboundBuffer中所有Entry节点包裹的ChannelPromise设置为不可取消状态。

publicinterfacePromise<V>extendsFuture<V>{\n\n/**\n*设置当前future为不可取消状态\n*\n*返回true的情况:\n*1:成功的将future设置为uncancellable\n*2:当future已经成功完成\n*\n*返回false的情况:\n*1:future已经被取消,则不能在设置uncancellable状态\n*\n*/\nbooleansetUncancellable();\n\n}\n

如果这里的setUncancellable()方法返回false则说明在这之前用户已经将ChannelPromise取消掉了,接下来就需要调用entry.cancel()方法来释放为待发送数据msg分配的堆外内存。

staticfinalclassEntry{\n//write操作是否被取消\nbooleancancelled;\n\nintcancel(){\nif(!cancelled){\ncancelled=true;\nintpSize=pendingSize;\n\n//releasemessageandreplacewithanemptybuffer\nReferenceCountUtil.safeRelease(msg);\nmsg=Unpooled.EMPTY_BUFFER;\n\npendingSize=0;\ntotal=0;\nprogress=0;\nbufs=null;\nbuf=null;\nreturnpSize;\n}\nreturn0;\n}\n\n}\n

当Entry对象被取消后,就需要减少ChannelOutboundBuffer的内存占用总量的水位线totalPendingSize。

privatestaticfinalAtomicLongFieldUpdater<ChannelOutboundBuffer>TOTAL_PENDING_SIZE_UPDATER=\nAtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class,&34;);\n\n//水位线指针.ChannelOutboundBuffer中的待发送数据的内存占用总量:所有Entry对象本身所占用内存大小+所有待发送数据的大小\nprivatevolatilelongtotalPendingSize;\n\nprivatevoiddecrementPendingOutboundBytes(longsize,booleaninvokeLater,booleannotifyWritability){\nif(size==0){\nreturn;\n}\n\nlongnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,-size);\nif(notifyWritability&&newWriteBufferSize<channel.config().getWriteBufferLowWaterMark()){\nsetWritable(invokeLater);\n}\n}\n

当更新后的水位线低于低水位线DEFAULT_LOW_WATER_MARK=32*1024时,就将当前channel设置为可写状态。

privatevoidsetWritable(booleaninvokeLater){\nfor(;;){\nfinalintoldValue=unwritable;\nfinalintnewValue=oldValue&~1;\nif(UNWRITABLE_UPDATER.compareAndSet(this,oldValue,newValue)){\nif(oldValue!=0&&newValue==0){\nfireChannelWritabilityChanged(invokeLater);\n}\nbreak;\n}\n}\n}\n

当Channel的状态是第一次从不可写状态变为可写状态时,Netty会在pipeline中再次触发ChannelWritabilityChanged事件的传播。

响应channelWritabilityChanged事件.png

4.2.2发送数据前的最后检查—flush0

flush0方法这里主要做的事情就是检查当channel的状态是否正常,如果channel状态一切正常,则调用doWrite方法发送数据。

protectedabstractclassAbstractUnsafeimplementsUnsafe{\n\n//是否正在进行flush操作\nprivatebooleaninFlush0;\n\nprotectedvoidflush0(){\nif(inFlush0){\n//Avoidre-entrance\nreturn;\n}\n\nfinalChannelOutboundBufferoutboundBuffer=this.outboundBuffer;\n//channel已经关闭或者outboundBuffer为空\nif(outboundBuffer==null||outboundBuffer.isEmpty()){\nreturn;\n}\n\ninFlush0=true;\n\nif(!isActive()){\ntry{\nif(!outboundBuffer.isEmpty()){\nif(isOpen()){\n//当前channel处于disConnected状态通知promise写入失败并触发channelWritabilityChanged事件\noutboundBuffer.failFlushed(newNotYetConnectedException(),true);\n}else{\n//当前channel处于关闭状态通知promise写入失败但不触发channelWritabilityChanged事件\noutboundBuffer.failFlushed(newClosedChannelException(initialCloseCause,&34;),false);\n}\n}\n}finally{\ninFlush0=false;\n}\nreturn;\n}\n\ntry{\n//写入Socket\ndoWrite(outboundBuffer);\n}catch(Throwablet){\nhandleWriteError(t);\n}finally{\ninFlush0=false;\n}\n}\n\n}\n

outboundBuffer==null||outboundBuffer.isEmpty():如果channel已经关闭了或者对应写缓冲区中没有任何数据,那么就停止发送流程,直接return。!isActive():如果当前channel处于非活跃状态,则需要调用outboundBufferfailFlushed

publicfinalclassChannelOutboundBuffer{\n\nprivatebooleaninFail;\n\nvoidfailFlushed(Throwablecause,booleannotify){\nif(inFail){\nreturn;\n}\n\ntry{\ninFail=true;\nfor(;;){\nif(!remove0(cause,notify)){\nbreak;\n}\n}\n}finally{\ninFail=false;\n}\n}\n}\n

该方法用于在Netty在发送数据的时候,如果发现当前channel处于非活跃状态,则将ChannelOutboundBuffer中flushedEntry与tailEntry之间的Entry对象节点全部删除,并释放发送数据占用的内存空间,同时回收Entry对象实例。

4.2.2.2ChannelOutboundBuffernioBuffers方法完成以上ByteBuffer类型的转换。

maxBytesPerGatheringWrite:表示本次writeloop中最多从ChannelOutboundBuffer中转换maxBytesPerGatheringWrite个字节出来。也就是本次writeloop最多能发送多少字节。1024:本次writeloop最多转换1024个ByteBuffer(JDKNIO实现)。也就是说本次writeloop最多批量发送多少个ByteBuffer。

通过ChannelOutboundBuffertransferTo方法底层用到的系统调用为sendFile实现零拷贝网络文件的传输。

\npublicclassNioSocketChannelextendsAbstractNioByteChannelimplementsio.netty.channel.socket.SocketChannel{\n\n@Override\nprotectedlongdoWriteFileRegion(FileRegionregion)throwsException{\nfinallongposition=region.transferred();\nreturnregion.transferTo(javaChannel(),position);\n}\n\n}\n\n

关于Netty中涉及到的零拷贝,笔者会有一篇专门的文章为大家讲解,本文的主题我们还是先聚焦于把发送流程的主线打通。

我们继续回到发送数据流程主线上来~~

case0:\n//这里主要是针对网络传输文件数据的处理FileRegion\nwriteSpinCount-=doWrite0(in);\nbreak;\n

region.transferred()>=region.count():表示当前FileRegion中的文件数据已经传输完毕。那么在这种情况下本次writeloop没有写入任何数据到Socket,所以返回0,writeSpinCount-0意思就是本次writeloop不算,继续循环。localFlushedAmount>0:表示本writeloop中写入了一些数据到Socket中,会有返回1,writeSpinCount-1减少一次writeloop次数。localFlushedAmount<=0:表示当前Socket发送缓冲区已满,无法写入数据,那么就返回WRITE_STATUS_SNDBUF_FULL=Integer.MAX_VALUE。writeSpinCount-Integer.MAX_VALUE必然是负数,直接退出循环,向Reactor注册OP_WRITE事件并退出flush流程。等Socket发送缓冲区可写了,Reactor会通知channel继续发送文件数据。记住这里,我们后面还会提到

5.2.2发送普通数据

剩下两个case1和default分支主要就是处理ByteBuffer装载的普通数据发送逻辑。

其中case1表示当前Channel的ChannelOutboundBuffer中只包含了一个NioByteBuffer的情况。

default表示当前Channel的ChannelOutboundBuffer中包含了多个NioByteBuffers的情况。

@Override\nprotectedvoiddoWrite(ChannelOutboundBufferin)throwsException{\nSocketChannelch=javaChannel();\nintwriteSpinCount=config().getWriteSpinCount();\ndo{\n\n………将待发送数据转换到JDKNIOByteBuffer中………\n\n//本次writeloop中需要发送的JDKByteBuffer个数\nintnioBufferCnt=in.nioBufferCount();\n\nswitch(nioBufferCnt){\ncase0:\n……….处理网络文件传输………\ncase1:{\nByteBufferbuffer=nioBuffers[0];\nintattemptedBytes=buffer.remaining();\nfinalintlocalWrittenBytes=ch.write(buffer);\nif(localWrittenBytes<=0){\n//如果当前Socket发送缓冲区满了写不进去了,则注册OP_WRITE事件,等待Socket发送缓冲区可写时在写\n//SubReactor在处理OP_WRITE事件时,直接调用flush方法\nincompleteWrite(true);\nreturn;\n}\n//根据当前实际写入情况调整maxBytesPerGatheringWrite数值\nadjustMaxBytesPerGatheringWrite(attemptedBytes,localWrittenBytes,maxBytesPerGatheringWrite);\n//如果ChannelOutboundBuffer中的某个Entry被全部写入则删除该Entry\n//如果Entry被写入了一部分还有一部分未写入则更新Entry中的readIndex等待下次writeLoop继续写入\nin.removeBytes(localWrittenBytes);\n–writeSpinCount;\nbreak;\n}\ndefault:{\n//ChannelOutboundBuffer中总共待写入数据的字节数\nlongattemptedBytes=in.nioBufferSize();\n//批量写入\nfinallonglocalWrittenBytes=ch.write(nioBuffers,0,nioBufferCnt);\nif(localWrittenBytes<=0){\nincompleteWrite(true);\nreturn;\n}\n//根据实际写入情况调整一次写入数据大小的最大值\n//maxBytesPerGatheringWrite决定每次可以从channelOutboundBuffer中获取多少发送数据\nadjustMaxBytesPerGatheringWrite((int)attemptedBytes,(int)localWrittenBytes,\nmaxBytesPerGatheringWrite);\n//移除全部写完的BUffer,如果只写了部分数据则更新buffer的readerIndex,下一个writeLoop写入\nin.removeBytes(localWrittenBytes);\n–writeSpinCount;\nbreak;\n}\n}\n}while(writeSpinCount>0);\n\n…………处理本轮writeloop未写完的情况…….\n}\n

case1和default这两个分支在处理发送数据时的逻辑是一样的,唯一的区别就是case1是处理单个NioByteBuffer的发送,而default分支是批量处理多个NioByteBuffers的发送。

下面笔者就以经常被触发到的default分支为例来为大家讲述Netty在处理数据发送时的逻辑细节:

首先从当前NioSocketChannel中的ChannelOutboundBuffer中获取本次writeloop需要发送的字节总量attemptedBytes。这个nioBufferSize是在前边介绍ChannelOutboundBuffer34;msg”);\n\n…………….省略检查promise的有效性……………\n\n//flush=true表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler\n//flush=false表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler\nfinalAbstractChannelHandlerContextnext=findContextOutbound(flush?\n(MASK_WRITE|MASK_FLUSH):MASK_WRITE);\n//用于检查内存泄露\nfinalObjectm=pipeline.touch(msg,next);\n//获取下一个要被执行的channelHandler的executor\nEventExecutorexecutor=next.executor();\n//确保OutBound事件由ChannelHandler指定的executor执行\nif(executor.inEventLoop()){\n//如果当前线程正是channelHandler指定的executor则直接执行\nif(flush){\nnext.invokeWriteAndFlush(m,promise);\n}else{\nnext.invokeWrite(m,promise);\n}\n}else{\n//如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。\nfinalWriteTasktask=WriteTask.newInstance(next,m,promise,flush);\nif(!safeExecute(executor,task,promise,m,!flush)){\ntask.cancel();\n}\n}\n}\n

由于在writeAndFlush流程的处理中,flush标志被设置为true,所以这里有两个地方会和write事件的处理有所不同。

findContextOutbound(MASK_WRITE|MASK_FLUSH):这里在pipeline中向前查找的ChanneOutboundHandler需要实现write方法或者flush方法。这里需要注意的是write方法和flush方法只需要实现其中一个即可满足查找条件。因为一般我们自定义ChannelOutboundHandler时,都会继承ChannelOutboundHandlerAdapter类,而在ChannelInboundHandlerAdapter类中对于这些outbound事件都会有默认的实现。

publicclassChannelOutboundHandlerAdapterextendsChannelHandlerAdapterimplementsChannelOutboundHandler{\n\n@Skip\n@Override\npublicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{\nctx.write(msg,promise);\n}\n\n\n@Skip\n@Override\npublicvoidflush(ChannelHandlerContextctx)throwsException{\nctx.flush();\n}\n\n}\n

这样在后面传播write事件或者flush事件的时候,我们通过上面逻辑找出的ChannelOutboundHandler中可能只实现了一个flush方法或者write方法。不过这样没关系,如果这里在传播outbound事件的过程中,发现找出的ChannelOutboundHandler中并没有实现对应的outbound事件回调函数,那么就直接调用在ChannelOutboundHandlerAdapter中的默认实现。

在向前传播writeAndFlush事件的时候会通过调用ChannelHandlerContext的invokeWriteAndFlush方法,先传播write事件然后在传播flush事件。

voidinvokeWriteAndFlush(Objectmsg,ChannelPromisepromise){\nif(invokeHandler()){\n//向前传递write事件\ninvokeWrite0(msg,promise);\n//向前传递flush事件\ninvokeFlush0();\n}else{\nwriteAndFlush(msg,promise);\n}\n}\n\nprivatevoidinvokeWrite0(Objectmsg,ChannelPromisepromise){\ntry{\n//调用当前ChannelHandler中的write方法\n((ChannelOutboundHandler)handler()).write(this,msg,promise);\n}catch(Throwablet){\nnotifyOutboundHandlerException(t,promise);\n}\n}\n\nprivatevoidinvokeFlush0(){\ntry{\n((ChannelOutboundHandler)handler()).flush(this);\n}catch(Throwablet){\ninvokeExceptionCaught(t);\n}\n}\n\n

这里我们看到了writeAndFlush的核心处理逻辑,首先向前传播write事件,经过write事件的流程处理后,最后向前传播flush事件。

根据前边的介绍,这里在向前传播write事件的时候,可能查找出的ChannelOutboundHandler只是实现了flush方法,不过没关系,这里会直接调用write方法在ChannelOutboundHandlerAdapter父类中的默认实现。同理flush也是一样。

总结

到这里,Netty处理数据发送的整个完整流程,笔者就为大家详细地介绍完了,可以看到Netty在处理读取数据和处理发送数据的过程中,虽然核心逻辑都差不多,但是发送数据的过程明显细节比较多,而且更加复杂一些。

这里笔者将读取数据和发送数据的不同之处总结如下几点供大家回忆对比:

在每次readloop之前,会分配一个大小固定的diretByteBuffer用来装载读取数据。每轮readloop完全结束之后,才会决定是否对下一轮的读取过程分配的directByteBuffer进行扩容或者缩容。在每次writeloop之前,都会获取本次writeloop最大能够写入的字节数,根据这个最大写入字节数从ChannelOutboundBuffer中转换JDKNIOByteBuffer。每次写入Socket之后都需要重新评估是否对这个最大写入字节数进行扩容或者缩容。readloop和writeloop都被默认限定最多执行16次。在一个完整的readloop中,如果还读取不完数据,直接退出。等到reactor线程执行完其他channel上的IO事件再来读取未读完的数据。而在一个完整的writeloop中,数据发送不完,则分两种情况。Socket缓冲区满无法在继续写入。这时就需要向reactor注册OP_WRITE事件。等Socket缓冲区变的可写时,epoll通知reactor线程继续发送。Socket缓冲区可写,但是由于发送数据太多,导致虽然写满16次但依然没有写完。这时就直接向reactor丢一个flushTask进去,等到reactor线程执行完其他channel上的IO事件,在回过头来执行flushTask。OP_READ事件的注册是在NioSocketChannel被注册到对应的Reactor中时就会注册。而OP_WRITE事件只会在Socket缓冲区满的时候才会被注册。当Socket缓冲区再次变得可写时,要记得取消OP_WRITE事件的监听。否则的话就会一直被通知

好了,本文的全部内容就到这里了,我们下篇文章见~~~~

关于本次包裹网站源码分享和包裹网站源码分享怎么弄的问题分享到这里就结束了,如果解决了您的问题,我们非常高兴。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平