博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ChannelHandlerContext 常用Api源码解读
阅读量:6052 次
发布时间:2019-06-20

本文共 7173 字,大约阅读时间需要 23 分钟。

hot3.png

源码地址: 

 

114643_9OYr_3101476.png

我们知道对于每个在pipeline中的ChannelHandler (入栈 或者 出栈),都对应一个ChannelHandlerContext与其绑定。细节可通  https://my.oschina.net/LucasZhu/blog/1618012

了解

我们常用ChannelHandlerContext中常用的方法有(主要分析相关数据流传递的方法):

  1. public ChannelHandlerContext fireChannelRead(final Object msg);
  2. public ChannelFuture write(Object msg);
  3. public ChannelPipeline pipeline().write(Object msg);
  4. public Channel channel().write(Object msg);

1 . public ChannelHandlerContext fireChannelRead(final Object msg);

@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {    invokeChannelRead(findContextInbound(), msg);    return this;}向后查找,如果ChannelHandlerContext 为 inbound 的类型 就返回private AbstractChannelHandlerContext findContextInbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.next;    } while (!ctx.inbound);    return ctx;}//调用next的 ContextRead 方法static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {         //主要方法        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}private void invokeChannelRead(Object msg) {    if (invokeHandler()) {        try {            //获取handler对象(ChannelInboundHandler)并且执行该对象的channelRead方法......            ((ChannelInboundHandler) handler()).channelRead(this, msg);        } catch (Throwable t) {            notifyHandlerException(t);        }    } else {        fireChannelRead(msg);    }}

findContextInbound() 是数据流向的关键,是pipeline 后查找InboundHandler 并执行该handler的 channelRead方法。

2 . public ChannelFuture write(Object msg); 源码解析

130001_JLlk_3101476.png

AbstractChannelHandlerContext.java执行ChannelHandlerContext的write方法@Overridepublic ChannelFuture write(Object msg) {    return write(msg, newPromise());}@Overridepublic ChannelFuture write(final Object msg, final ChannelPromise promise) {    if (msg == null) {        throw new NullPointerException("msg");    }    try {        if (isNotValidPromise(promise, true)) {            ReferenceCountUtil.release(msg);            // cancelled            return promise;        }    } catch (RuntimeException e) {        ReferenceCountUtil.release(msg);        throw e;    }    write(msg, false, promise);    return promise;}private void write(Object msg, boolean flush, ChannelPromise promise) {	//从 this 在队列的位置开始,向上查找OutBound的ChannelHandlerContext对象    AbstractChannelHandlerContext next = findContextOutbound();    final Object m = pipeline.touch(msg, next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        if (flush) {            next.invokeWriteAndFlush(m, promise);        } else {        	调用获取对象的write方法            next.invokeWrite(m, promise);        }    } else {        AbstractWriteTask task;        if (flush) {            task = WriteAndFlushTask.newInstance(next, m, promise);        }  else {            task = WriteTask.newInstance(next, m, promise);        }        safeExecute(executor, task, promise, m);    }}private void invokeWrite(Object msg, ChannelPromise promise) {    if (invokeHandler()) {        invokeWrite0(msg, promise);    } else {        write(msg, promise);    }}private void invokeWrite0(Object msg, ChannelPromise promise) {    try {    	获取OutboundHandlerContext中的handler对象,调用该对象的write方法。        ((ChannelOutboundHandler) handler()).write(this, msg, promise);    } catch (Throwable t) {        notifyOutboundHandlerException(t, promise);    }}// 重点从对象所在队列的位置开始 ,向队列(链表)上方查找可以执行(OutboundHandler)的Handler对象private AbstractChannelHandlerContext findContextOutbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.prev;    } while (!ctx.outbound);    return ctx;}MessageToByteEncoder.java@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {        if (acceptOutboundMessage(msg)) {            @SuppressWarnings("unchecked")            I cast = (I) msg;            buf = allocateBuffer(ctx, cast, preferDirect);            try {                encode(ctx, cast, buf);            } finally {                ReferenceCountUtil.release(cast);            }            if (buf.isReadable()) {                ctx.write(buf, promise);            } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);            }            buf = null;        } else {            ctx.write(msg, promise);        }    } catch (EncoderException e) {        throw e;    } catch (Throwable e) {        throw new EncoderException(e);    } finally {        if (buf != null) {            buf.release();        }    }}

3.public ChannelPipeline pipeline().write(Object msg);

141533_HTqu_3101476.png

ctx.pipeline().write(msg);DefaultChannelPipeline.java@Overridepublic final ChannelFuture write(Object msg) {	// 从pipeline队列最后的HandlerContext开始,向前遍历写数据到ChannelOutboundHandler中    return tail.write(msg);}AbstractChannelHandlerContext.java@Overridepublic ChannelFuture write(Object msg) {    return write(msg, newPromise());}@Overridepublic ChannelFuture write(final Object msg, final ChannelPromise promise) {    if (msg == null) {        throw new NullPointerException("msg");    }    try {        if (isNotValidPromise(promise, true)) {            ReferenceCountUtil.release(msg);            // cancelled            return promise;        }    } catch (RuntimeException e) {        ReferenceCountUtil.release(msg);        throw e;    }    write(msg, false, promise);    return promise;}private void write(Object msg, boolean flush, ChannelPromise promise) {	// 向前查找ChannelHandlerContext 为Inbound的Handler 处理数据    AbstractChannelHandlerContext next = findContextOutbound();    final Object m = pipeline.touch(msg, next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        if (flush) {            next.invokeWriteAndFlush(m, promise);        } else {        	//调用方法            next.invokeWrite(m, promise);        }    } else {        AbstractWriteTask task;        if (flush) {            task = WriteAndFlushTask.newInstance(next, m, promise);        }  else {            task = WriteTask.newInstance(next, m, promise);        }        safeExecute(executor, task, promise, m);    }}private void invokeWrite0(Object msg, ChannelPromise promise) {    try {        ((ChannelOutboundHandler) handler()).write(this, msg, promise);    } catch (Throwable t) {        notifyOutboundHandlerException(t, promise);    }}// 向前查找ChannelHandlerContext 为Inbound的Handler 处理数据private AbstractChannelHandlerContext findContextOutbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.prev;    } while (!ctx.outbound);    return ctx;}

public Channel channel().write(Object msg);

ctx.channel().write(msg);AbstractChannel.java@Overridepublic ChannelFuture write(Object msg) {    return pipeline.write(msg);}DefaultChannelPipeline.java@Overridepublic final ChannelFuture write(Object msg) {    return tail.write(msg);}.... 同上

 

 

 

 

 

 

 

 

 

 

 

转载于:https://my.oschina.net/LucasZhu/blog/1632987

你可能感兴趣的文章
GregorianCalendar日历程序
查看>>
Sublime 中运行 Shell 、Python、Lua、Groovy...等各种脚本
查看>>
【Java集合源码剖析】ArrayList源码剖析
查看>>
linux的目录结构
查看>>
这次逻辑通了,
查看>>
HTMLHelper
查看>>
快速构建Windows 8风格应用29-捕获图片与视频
查看>>
OC语言Block和协议
查看>>
使用xpath时出现noDefClass的错误(找不到某个类)
查看>>
.Net规则引擎介绍 - REngine
查看>>
CSS3 transforms 3D翻开
查看>>
利用传入的Type类型来调用范型方法的解决方案
查看>>
Top命令内存占用剖析
查看>>
转 网络IO模型:同步IO和异步IO,阻塞IO和非阻塞IO
查看>>
求带分数(蓝桥杯)
查看>>
Retrofit 入门学习
查看>>
Spring Boot学习笔记
查看>>
laravel 集合接口
查看>>
java.exe进程来源排查录
查看>>
C++实现KMP模式匹配算法
查看>>