客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮询、事件处理是在NioEventLoop的run方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()。
从processSelectedKeys()一直追踪下去,可以看到OP_READ处理逻辑分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}可能你会比较奇怪:为什么OP_READ和OP_ACCEPT都会走这个分支?
(相关资料图)
OP_ACCEPT是NioServerSocketChannel处理的事件,而OP_READ是NioSocketChannel处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe类型也不一样,一个是:NioMessageUnsafe,另一个是:NioSocketChannelUnsafe。NioServerSocketChannel负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle的OP_READ,所以,unsafe是NioSocketChannelUnsafe类型实例。
AbstractNioByteChannel.NioByteUnsafe#read方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf)):调用java api,从channel中读取字节数据到ByteBuf缓存中;pipeline.fireChannelRead(byteBuf):触发pipeline的channelRead事件,并将带有读入数据的ByteBuf通过参数传入;pipeline.fireChannelReadComplete():触发pipeline的channelReadComplete事件;调用pipeline的fireChannelRead()就可触发channelRead事件在handler之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一个是在哪个handler上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler上触发channelRead事件。由于pipeline中的handler是被包装成HandlerContext放入的,所以,可以通过handler()方法找到真正的handler对象进行触发。
比如pipeline的fireChannelRead()就是触发head的channelRead事件,如果处理完成需要把事件继续传播给下一个handler,就需要调用ctx.fireChannelRead(msg)方法即可,该方法中通过next属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)运行完成后会调用pipeline.fireChannelReadComplete()方法,触发channelReadComplete事件,执行机制和channelRead事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()和ctx.pipeline().fireChannelRead()之间的区别了,避免误用。
上面分析的都是常规模式,没有给handler指定额外线程情况下channelRead和channelReadComplete传播机制,大致如下图:
先触发channelRead事件,按照pipeline中顺序依次触发,当所有handler都触发完后,再触发channelReadComplete事件,按照pipeline中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel注册的NioEventLoop。
我们来看下static void invokeChannelRead()这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}在执行next.invokeChannelRead(m)方法前有个executor.inEventLoop()判断,判断当前执行线程是不是就是handler执行所需的线程。执行handler方法是不能随便线程都可以去执行的,必须使用handler内部指定的executor线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor线程执行器中执行,如果当前线程和handler执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor的任务队列中让其执行。
executor线程执行器是通过next.executor()方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext中executor有值则直接返回;否则返回channel注册的NioEventLoop作为线程执行器。
在添加handler时可以指定一个EventGroup:pipeline.addLast( bizGroup, "handler2", new OtherTest02());,这样,再把handler包装成HandlerContext过程中会从这个EventGroup根据chooser选取策略获得一个EventLoop赋值给executor。
所以,从上面分析,默认情况下handler都是在channel注册的NioEventLoop线程中执行的,除非在addLast添加handloer时特别指定。
下面我们通过一个案例分析下pipeline线程模型,如下,给handler02添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}这时,channelRead和channelReadComplete事件触发流程见下图:
channelRead事件执行流程说明:
handler01的channelRead事件,本身当前线程和handler01是同一个线程,所以,直接调用handler#channelRead()方法;handler01#channelRead()方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()方法,但是handler02执行线程并不是默认的channel的注册线程,而是额外设置的biz线程,需要将调用包装成一个任务提交到biz线程的任务队列taskQueue中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()方法执行完成后,需要执行handler03#channelRead(),它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()的调用封装成一个任务提交到register eventLoop的taskQueue中,待其内部线程提取执行;下面再来看下channelReadComplete事件执行流程:
a1将任务提交给taskQueue任务队列后直接返回了,而不是等其执行完成再返回;a1返回后,从源码分析来看,会立即触发channelReadComplete事件,涉及到线程切换,同理b1这里也是将handler02#channelReadComplete()调用封装成任务放入到biz eventLoop的taskQueue中的,然后也直接返回了;这样,biz eventLoop线程执行器taskQueue中就有两个任务,会按照顺序依次执行:先执行channelRead()调用,再执行channelReadComplete()调用;执行a3、b3时同理;从上面可以看出,Pipeline中handler可以在不同线程间切换得到关键是:taskQueue。还要一点非常重要:handler线程池执行器默认使用的channel注册的NioEventLoop这个,NioEventLoop采用的是单线程工作模式,同时还需要处理selector.select()事件轮询,所以,handler里肯定不能有耗时、特别是IO阻塞等操作,不然卡在handler中,selector#select()执行不到,无法及时接收到客户端传送过来的数据。
标签:
最新推荐
创业者、河长、村支书,这是重庆市合川区三庙镇戴花村青年何波的不同头衔,但几个身份都指向了同一个使...
律师兼任调解员,不打官司也能化解纠纷,代理调解受指派的公益性案件还免费。这是兵团第十师北屯市探索...
根据教育部教育考试院统一安排,2022年上半年全国大学英语四、六级口语考试将于5月21日-22日举行,笔试...
学生代购的“苦”与“乐” “你问的这个产品现在做促销活动,买一件包邮,还送小样和面膜……”...
11月11日大连市新冠肺炎疫情防控总指挥部发布,11月10日0时至24时,大连市新增21例本土新冠肺炎确诊...
纤维素制成闪光材料无毒可降解 或彻底改变化妆品行业 科技日报北京11月11日电 (实习记者张...
海洋中或堆积了2 8万吨新冠废物 科技日报北京11月11日电 (记者刘霞)据美国趣味科学网站10日报...
开屏广告又现新花招,换个马甲就重来? ■ 来论 据媒体报道,“双十一”期间,一些App的开屏...
对不合理教师资格认定标准,该全面清理了 ■ 来论 针对网友留言反映的“糖尿病无法通过教师...
虚假宣传、以次充好、售卖临期产品不提示直播间商家“放水”让消费者闹心 关注“双11” 今年...
“扫码抽手机”实则是广告 快递单能“领红包”面单广告是谁发的? “双11”之际,消费者被商...
中新网11月12日电 据北京市疾病预防控制中心微信公众号消息,2021年11月10日北京市接报1例在京存在...
(抗击新冠肺炎)辽宁大连本轮疫情病毒为德尔塔变异株 24个区域划定为中风险地区 中新社大连11月1...
中新网西安11月11日电 (梅镱泷 杨起超)记者11日从西安市鄠邑区秦保局获悉,太平国有生态林场架设...
(抗击新冠肺炎)成都停业整顿56家零售药店 买感冒药需提供身份证 中新网成都11月11日电 (记者 ...
中新网大连11月11日电 (记者 杨毅) 11月11日,大连市政府秘书长衣庆焘在大连疫情防控新闻发布会...
新华社杭州11月11日电(记者冯源)在商周时期,如今的浙江中西部活跃着一个名为“姑蔑”的族群,但是...
中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...
中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...
中新网昆明11月11日电(记者 缪超)云南“最美政法干警”发布仪式11日在昆明举行。会上,授予昆明市...
(抗击新冠肺炎)甘肃凝聚“她力量”:互助抗疫,女人更懂女人心 中新网兰州11月11日电 (记者 徐...
中新网兰州11月11日电 (史静静)在甘肃金川公司,27年来葛小海始终在生产一线,他参与的“渣罐车制...
中新网乌鲁木齐11月11日电 (王小军 罗宣政 廖超)11月11日,一批来自浙江嘉兴的爱心物资,跨越...
中新网兰州11月11日电 (邬凡 朱学成)11月10日5时30分,位于敦煌车站旁的敦煌综合工区,钢轨探伤车...
中新网重庆11月11日电 (梁钦卿)“我今年上小学三年级了,我不怕疼,打疫苗是为了抵抗新冠病毒。”1...
中新网绵阳11月11日电 (岳波 李远梅)四川绵阳警方11日通报称,一男子酒后无聊多次报警称自己的...
中新网11月11日电 据中国民航局网站消息,11月11日,民航局再发熔断指令,对德国汉莎航空公司LH728...
中新网成都11月11日电 题:疫情中轮椅上的“逆行者”:想为大家做力所能及的事 作者 祝欢 ...
中新网上海11月11日电 (记者 李姝徵)上海警方11日召开发布会披露,在近期“砺剑”行动中破获了一...
中新网太原11月11日电 (记者 李庭耀)记者11日从山西省政府新闻办举行的新闻发布会上获悉,山西推...
中新网乌鲁木齐11月11日电 (刘雨珊 牛雨萌 艾尼)11日,记者从新疆水产科研所获悉,新疆博湖县将...
中新网大连11月11日电 (记者 杨毅)大连市新冠肺炎疫情防控总指挥部 11日发布公告,大连市将庄河...
中新网西安11月11日电 (记者 党田野)身穿白色“礼服”,摇晃着酒杯,时不时浅酌一口啤酒,然后与...
11月11日大连市新冠肺炎疫情防控总指挥部发布,按照国务院应对新冠肺炎疫情联防联控机制关于科学划...
中新网南京11月11日电 题:这个“双十一”南京的猪都“脱单”了 其实还有更让人嫉妒的…… ...
中新网呼伦贝尔11月11日电 (记者 张林虎)11日,记者从内蒙古自治区呼伦贝尔市公安局获悉,该局将...
中新网广州11月11日电 (记者 程景伟)“寻味帅府邂逅甜蜜——2021年帅府之夜”暨“海外拾珠——孙...
中新网徐州11月11日电 题:江苏徐州“家门口车管所”便民服务驶入“高速路” 作者 朱志庚 ...
中新网重庆11月11日电 (梁钦卿)为加强秋冬季空气污染应对,重庆市生态环境局11日发出2021年第九次...
新华社重庆11月11日电 题:深藏功名三十载 化作春蚕报乡亲——一名抗美援朝老兵的人生选择 新...
中新网益阳11月11日电 (王鹏 王庆庆)爱花本是修身养性、陶冶情操之事,湖南益阳市桃江县桃花江镇...
中新网成都11月11日电 题:成都25位民辅警的“封闭”生活:有人“转行”送外卖 有人变身“仓鼠管...
中新网南京11月11日电 (徐珊珊)江苏省教育厅体育卫生与艺术教育处处长张鲤鲤11日在南京表示,到202...
中新网成都11月11日电 (记者 吕杨)成都市公园城市建设管理局11日正式发布公园城市银杏观叶指数,...
中新网南京11月11日电 (徐珊珊)11日,江苏省教育厅召开新闻发布会,发布2020年江苏省学生体质健康...
中新网宜昌11月11日电 (江雅丽 董晓斌)17年前,四川广安一夫妇的6岁儿子被人拐走,夫妻俩寻找多年...
中新网宁德11月11日电 (林榕生)福建宁德市柘荣县应对新型冠状病毒感染肺炎疫情工作领导小组(指挥部...
11月11日,内蒙古额济纳旗新冠肺炎防控工作指挥部发布《关于调整额济纳旗达来呼布镇风险等级的公告...
(抗击新冠肺炎)内蒙古现有本土确诊病例32例 伊金霍洛旗确诊病例清零 中新网呼和浩特11月11日电...
中新网呼和浩特11月11日电 (记者 张林虎)11日,记者从内蒙古自治区通辽市相关部门获悉,从10日下...
以“高利息、高回报”为诱饵 200余名老人被骗1000余万元
成都抗疫的外籍志愿者:愿为城市“康复”贡献力量
Copyright © 2015-2022 太平洋律师网版权所有 备案号:豫ICP备2022016495号-17 联系邮箱:93 96 74 66 9@qq.com