客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel
的事件轮询、事件处理是在NioEventLoop
的run
方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()
。
从processSelectedKeys()
一直追踪下去,可以看到OP_READ
处理逻辑分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你会比较奇怪:为什么OP_READ
和OP_ACCEPT
都会走这个分支?
(相关资料图)
OP_ACCEPT
是NioServerSocketChannel
处理的事件,而OP_READ
是NioSocketChannel
处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe
类型也不一样,一个是:NioMessageUnsafe
,另一个是:NioSocketChannelUnsafe
。NioServerSocketChannel
负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
类型实例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:调用java api
,从channel
中读取字节数据到ByteBuf
缓存中;pipeline.fireChannelRead(byteBuf)
:触发pipeline
的channelRead
事件,并将带有读入数据的ByteBuf
通过参数传入;pipeline.fireChannelReadComplete()
:触发pipeline
的channelReadComplete
事件;调用pipeline
的fireChannelRead()
就可触发channelRead
事件在handler
之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext
中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一个是在哪个handler
上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler
上触发channelRead
事件。由于pipeline
中的handler
是被包装成HandlerContext
放入的,所以,可以通过handler()
方法找到真正的handler
对象进行触发。
比如pipeline
的fireChannelRead()
就是触发head
的channelRead
事件,如果处理完成需要把事件继续传播给下一个handler
,就需要调用ctx.fireChannelRead(msg)
方法即可,该方法中通过next
属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)
这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)
运行完成后会调用pipeline.fireChannelReadComplete()
方法,触发channelReadComplete
事件,执行机制和channelRead
事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之间的区别了,避免误用。
上面分析的都是常规模式,没有给handler
指定额外线程情况下channelRead
和channelReadComplete
传播机制,大致如下图:
先触发channelRead
事件,按照pipeline
中顺序依次触发,当所有handler
都触发完后,再触发channelReadComplete
事件,按照pipeline
中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel
注册的NioEventLoop
。
我们来看下static void invokeChannelRead()
这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在执行next.invokeChannelRead(m)
方法前有个executor.inEventLoop()
判断,判断当前执行线程是不是就是handler
执行所需的线程。执行handler
方法是不能随便线程都可以去执行的,必须使用handler
内部指定的executor
线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor
线程执行器中执行,如果当前线程和handler
执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor
的任务队列中让其执行。
executor
线程执行器是通过next.executor()
方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext
中executor
有值则直接返回;否则返回channel
注册的NioEventLoop
作为线程执行器。
在添加handler
时可以指定一个EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,这样,再把handler
包装成HandlerContext
过程中会从这个EventGroup
根据chooser
选取策略获得一个EventLoop
赋值给executor
。
所以,从上面分析,默认情况下handler
都是在channel
注册的NioEventLoop
线程中执行的,除非在addLast
添加handloer
时特别指定。
下面我们通过一个案例分析下pipeline
线程模型,如下,给handler02
添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
这时,channelRead
和channelReadComplete
事件触发流程见下图:
channelRead
事件执行流程说明:
handler01
的channelRead
事件,本身当前线程和handler01
是同一个线程,所以,直接调用handler#channelRead()
方法;handler01#channelRead()
方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()
方法,但是handler02
执行线程并不是默认的channel
的注册线程,而是额外设置的biz
线程,需要将调用包装成一个任务提交到biz
线程的任务队列taskQueue
中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue
中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()
方法执行完成后,需要执行handler03#channelRead()
,它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()
的调用封装成一个任务提交到register eventLoop的taskQueue
中,待其内部线程提取执行;下面再来看下channelReadComplete
事件执行流程:
a1
将任务提交给taskQueue
任务队列后直接返回了,而不是等其执行完成再返回;a1
返回后,从源码分析来看,会立即触发channelReadComplete
事件,涉及到线程切换,同理b1
这里也是将handler02#channelReadComplete()
调用封装成任务放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;这样,biz eventLoop
线程执行器taskQueue
中就有两个任务,会按照顺序依次执行:先执行channelRead()
调用,再执行channelReadComplete()
调用;执行a3、b3
时同理;从上面可以看出,Pipeline
中handler
可以在不同线程间切换得到关键是:taskQueue
。还要一点非常重要:handler
线程池执行器默认使用的channel
注册的NioEventLoop
这个,NioEventLoop
采用的是单线程工作模式,同时还需要处理selector.select()
事件轮询,所以,handler
里肯定不能有耗时、特别是IO
阻塞等操作,不然卡在handler
中,selector#select()
执行不到,无法及时接收到客户端传送过来的数据。
标签:
最新推荐
创业者、河长、村支书,这是重庆市合川区三庙镇戴花村青年何波的不同头衔,但几个身份都指向了同一个使...
律师兼任调解员,不打官司也能化解纠纷,代理调解受指派的公益性案件还免费。这是兵团第十师北屯市探索...
根据教育部教育考试院统一安排,2022年上半年全国大学英语四、六级口语考试将于5月21日-22日举行,笔试...
学生代购的“苦”与“乐” “你问的这个产品现在做促销活动,买一件包邮,还送小样和面膜……”...
11月11日大连市新冠肺炎疫情防控总指挥部发布,11月10日0时至24时,大连市新增21例本土新冠肺炎确诊...
纤维素制成闪光材料无毒可降解 或彻底改变化妆品行业 科技日报北京11月11日电 (实习记者张...
海洋中或堆积了2 8万吨新冠废物 科技日报北京11月11日电 (记者刘霞)据美国趣味科学网站10日报...
开屏广告又现新花招,换个马甲就重来? ■ 来论 据媒体报道,“双十一”期间,一些App的开屏...
对不合理教师资格认定标准,该全面清理了 ■ 来论 针对网友留言反映的“糖尿病无法通过教师...
虚假宣传、以次充好、售卖临期产品不提示直播间商家“放水”让消费者闹心 关注“双11” 今年...
“扫码抽手机”实则是广告 快递单能“领红包”面单广告是谁发的? “双11”之际,消费者被商...
中新网11月12日电 据北京市疾病预防控制中心微信公众号消息,2021年11月10日北京市接报1例在京存在...
(抗击新冠肺炎)辽宁大连本轮疫情病毒为德尔塔变异株 24个区域划定为中风险地区 中新社大连11月1...
中新网西安11月11日电 (梅镱泷 杨起超)记者11日从西安市鄠邑区秦保局获悉,太平国有生态林场架设...
(抗击新冠肺炎)成都停业整顿56家零售药店 买感冒药需提供身份证 中新网成都11月11日电 (记者 ...
中新网大连11月11日电 (记者 杨毅) 11月11日,大连市政府秘书长衣庆焘在大连疫情防控新闻发布会...
新华社杭州11月11日电(记者冯源)在商周时期,如今的浙江中西部活跃着一个名为“姑蔑”的族群,但是...
中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...
中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...
中新网昆明11月11日电(记者 缪超)云南“最美政法干警”发布仪式11日在昆明举行。会上,授予昆明市...
(抗击新冠肺炎)甘肃凝聚“她力量”:互助抗疫,女人更懂女人心 中新网兰州11月11日电 (记者 徐...
中新网兰州11月11日电 (史静静)在甘肃金川公司,27年来葛小海始终在生产一线,他参与的“渣罐车制...
中新网乌鲁木齐11月11日电 (王小军 罗宣政 廖超)11月11日,一批来自浙江嘉兴的爱心物资,跨越...
中新网兰州11月11日电 (邬凡 朱学成)11月10日5时30分,位于敦煌车站旁的敦煌综合工区,钢轨探伤车...
中新网重庆11月11日电 (梁钦卿)“我今年上小学三年级了,我不怕疼,打疫苗是为了抵抗新冠病毒。”1...
中新网绵阳11月11日电 (岳波 李远梅)四川绵阳警方11日通报称,一男子酒后无聊多次报警称自己的...
中新网11月11日电 据中国民航局网站消息,11月11日,民航局再发熔断指令,对德国汉莎航空公司LH728...
中新网成都11月11日电 题:疫情中轮椅上的“逆行者”:想为大家做力所能及的事 作者 祝欢 ...
中新网上海11月11日电 (记者 李姝徵)上海警方11日召开发布会披露,在近期“砺剑”行动中破获了一...
中新网太原11月11日电 (记者 李庭耀)记者11日从山西省政府新闻办举行的新闻发布会上获悉,山西推...
中新网乌鲁木齐11月11日电 (刘雨珊 牛雨萌 艾尼)11日,记者从新疆水产科研所获悉,新疆博湖县将...
中新网大连11月11日电 (记者 杨毅)大连市新冠肺炎疫情防控总指挥部 11日发布公告,大连市将庄河...
中新网西安11月11日电 (记者 党田野)身穿白色“礼服”,摇晃着酒杯,时不时浅酌一口啤酒,然后与...
11月11日大连市新冠肺炎疫情防控总指挥部发布,按照国务院应对新冠肺炎疫情联防联控机制关于科学划...
中新网南京11月11日电 题:这个“双十一”南京的猪都“脱单”了 其实还有更让人嫉妒的…… ...
中新网呼伦贝尔11月11日电 (记者 张林虎)11日,记者从内蒙古自治区呼伦贝尔市公安局获悉,该局将...
中新网广州11月11日电 (记者 程景伟)“寻味帅府邂逅甜蜜——2021年帅府之夜”暨“海外拾珠——孙...
中新网徐州11月11日电 题:江苏徐州“家门口车管所”便民服务驶入“高速路” 作者 朱志庚 ...
中新网重庆11月11日电 (梁钦卿)为加强秋冬季空气污染应对,重庆市生态环境局11日发出2021年第九次...
新华社重庆11月11日电 题:深藏功名三十载 化作春蚕报乡亲——一名抗美援朝老兵的人生选择 新...
中新网益阳11月11日电 (王鹏 王庆庆)爱花本是修身养性、陶冶情操之事,湖南益阳市桃江县桃花江镇...
中新网成都11月11日电 题:成都25位民辅警的“封闭”生活:有人“转行”送外卖 有人变身“仓鼠管...
中新网南京11月11日电 (徐珊珊)江苏省教育厅体育卫生与艺术教育处处长张鲤鲤11日在南京表示,到202...
中新网成都11月11日电 (记者 吕杨)成都市公园城市建设管理局11日正式发布公园城市银杏观叶指数,...
中新网南京11月11日电 (徐珊珊)11日,江苏省教育厅召开新闻发布会,发布2020年江苏省学生体质健康...
中新网宜昌11月11日电 (江雅丽 董晓斌)17年前,四川广安一夫妇的6岁儿子被人拐走,夫妻俩寻找多年...
中新网宁德11月11日电 (林榕生)福建宁德市柘荣县应对新型冠状病毒感染肺炎疫情工作领导小组(指挥部...
11月11日,内蒙古额济纳旗新冠肺炎防控工作指挥部发布《关于调整额济纳旗达来呼布镇风险等级的公告...
(抗击新冠肺炎)内蒙古现有本土确诊病例32例 伊金霍洛旗确诊病例清零 中新网呼和浩特11月11日电...
中新网呼和浩特11月11日电 (记者 张林虎)11日,记者从内蒙古自治区通辽市相关部门获悉,从10日下...
以“高利息、高回报”为诱饵 200余名老人被骗1000余万元
成都抗疫的外籍志愿者:愿为城市“康复”贡献力量
Copyright © 2015-2022 太平洋律师网版权所有 备案号:豫ICP备2022016495号-17 联系邮箱:93 96 74 66 9@qq.com