当前位置: 首页 >  新闻中心  >   >  正文

【Netty源码分析】04 服务端读流程_今亮点

  • 2023-03-29 10:47:41 来源:腾讯云

读流程

客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮询、事件处理是在NioEventLooprun方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()

processSelectedKeys()一直追踪下去,可以看到OP_READ处理逻辑分支:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

可能你会比较奇怪:为什么OP_READOP_ACCEPT都会走这个分支?


(相关资料图)

OP_ACCEPTNioServerSocketChannel处理的事件,而OP_READNioSocketChannel处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe类型也不一样,一个是:NioMessageUnsafe,另一个是:NioSocketChannelUnsafeNioServerSocketChannel负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。

这里我们是处理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe类型实例。

AbstractNioByteChannel.NioByteUnsafe#read方法代码如下:

public final void read() { final ChannelConfig config = config();    if (shouldBreakReadReady(config)) {     clearReadPending();         return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();    allocHandle.reset(config);    ByteBuf byteBuf = null;    boolean close = false;    try {        do {            // 申请ByteBuf对象            byteBuf = allocHandle.allocate(allocator);            //doReadBytes(byteBuf):将数据读取到ByteBuf中            //lastBytesRead()将读取的字节数设置到lastBytesRead            allocHandle.lastBytesRead(doReadBytes(byteBuf));            if (allocHandle.lastBytesRead() <= 0) {                byteBuf.release();                byteBuf = null;                close = allocHandle.lastBytesRead() < 0;                if (close) {                    readPending = false;                }                break;            }            allocHandle.incMessagesRead(1);            readPending = false;            //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中            pipeline.fireChannelRead(byteBuf);            byteBuf = null;        } while (allocHandle.continueReading());//判断是否继续读取          allocHandle.readComplete();        //触发pipeline channelReadComplete        pipeline.fireChannelReadComplete();        if (close) {            closeOnRead(pipeline);        }    } catch (Throwable t) {        handleReadException(pipeline, byteBuf, t, close, allocHandle);    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:

allocHandle.lastBytesRead(doReadBytes(byteBuf)):调用java api,从channel中读取字节数据到ByteBuf缓存中;pipeline.fireChannelRead(byteBuf):触发pipelinechannelRead事件,并将带有读入数据的ByteBuf通过参数传入;pipeline.fireChannelReadComplete():触发pipelinechannelReadComplete事件;

事件传播

调用pipelinefireChannelRead()就可触发channelRead事件在handler之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。

关键点就在于HandlerContext中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一个是在哪个handler上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler上触发channelRead事件。由于pipeline中的handler是被包装成HandlerContext放入的,所以,可以通过handler()方法找到真正的handler对象进行触发。

比如pipelinefireChannelRead()就是触发headchannelRead事件,如果处理完成需要把事件继续传播给下一个handler,就需要调用ctx.fireChannelRead(msg)方法即可,该方法中通过next属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)这个方法就可以将事件传播到下一个节点上。

pipeline.fireChannelRead(byteBuf)运行完成后会调用pipeline.fireChannelReadComplete()方法,触发channelReadComplete事件,执行机制和channelRead事件一样,就不再赘述。

搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之间的区别了,避免误用。

Pipeline线程模型

上面分析的都是常规模式,没有给handler指定额外线程情况下channelReadchannelReadComplete传播机制,大致如下图:

先触发channelRead事件,按照pipeline中顺序依次触发,当所有handler都触发完后,再触发channelReadComplete事件,按照pipeline中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel注册的NioEventLoop

我们来看下static void invokeChannelRead()这个方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

在执行next.invokeChannelRead(m)方法前有个executor.inEventLoop()判断,判断当前执行线程是不是就是handler执行所需的线程。执行handler方法是不能随便线程都可以去执行的,必须使用handler内部指定的executor线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor线程执行器中执行,如果当前线程和handler执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor的任务队列中让其执行。

executor线程执行器是通过next.executor()方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContextexecutor有值则直接返回;否则返回channel注册的NioEventLoop作为线程执行器。

在添加handler时可以指定一个EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,这样,再把handler包装成HandlerContext过程中会从这个EventGroup根据chooser选取策略获得一个EventLoop赋值给executor

所以,从上面分析,默认情况下handler都是在channel注册的NioEventLoop线程中执行的,除非在addLast添加handloer时特别指定。

下面我们通过一个案例分析下pipeline线程模型,如下,给handler02添加一个额外的线程池:

EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast( "handler01", new OtherTest01());    pipeline.addLast( bizGroup, "handler02", new OtherTest02());    pipeline.addLast( "handler03", new OtherTest03());}

这时,channelReadchannelReadComplete事件触发流程见下图:

channelRead事件执行流程说明:

上下两部分代表两个线程,上面是channel注册的eventLoop,下面是添加handler02指定的eventLoop;首先触发handler01channelRead事件,本身当前线程和handler01是同一个线程,所以,直接调用handler#channelRead()方法;handler01#channelRead()方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()方法,但是handler02执行线程并不是默认的channel的注册线程,而是额外设置的biz线程,需要将调用包装成一个任务提交到biz线程的任务队列taskQueue中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()方法执行完成后,需要执行handler03#channelRead(),它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()的调用封装成一个任务提交到register eventLoop的taskQueue中,待其内部线程提取执行;

下面再来看下channelReadComplete事件执行流程:

上图a1将任务提交给taskQueue任务队列后直接返回了,而不是等其执行完成再返回;a1返回后,从源码分析来看,会立即触发channelReadComplete事件,涉及到线程切换,同理b1这里也是将handler02#channelReadComplete()调用封装成任务放入到biz eventLooptaskQueue中的,然后也直接返回了;这样,biz eventLoop线程执行器taskQueue中就有两个任务,会按照顺序依次执行:先执行channelRead()调用,再执行channelReadComplete()调用;执行a3、b3时同理;

总结

从上面可以看出,Pipelinehandler可以在不同线程间切换得到关键是:taskQueue。还要一点非常重要:handler线程池执行器默认使用的channel注册的NioEventLoop这个,NioEventLoop采用的是单线程工作模式,同时还需要处理selector.select()事件轮询,所以,handler里肯定不能有耗时、特别是IO阻塞等操作,不然卡在handler中,selector#select()执行不到,无法及时接收到客户端传送过来的数据。

标签:

上一篇 :

下一篇 :

最新推荐

让河流回到曾经的模样——“河小青”助力河长制

创业者、河长、村支书,这是重庆市合川区三庙镇戴花村青年何波的不同头衔,但几个身份都指向了同一个使...

律师兼任调解员 不打官司解纠纷

律师兼任调解员,不打官司也能化解纠纷,代理调解受指派的公益性案件还免费。这是兵团第十师北屯市探索...

青海2022年上半年英语四级报名时间:3月15日至3月18日

根据教育部教育考试院统一安排,2022年上半年全国大学英语四、六级口语考试将于5月21日-22日举行,笔试...

人生第一桶金、与学业冲突……学生代购的“苦”与“乐”

  学生代购的“苦”与“乐”  “你问的这个产品现在做促销活动,买一件包邮,还送小样和面膜……”...

辽宁大连发布10日新增21例本土确诊病例行程轨迹

  11月11日大连市新冠肺炎疫情防控总指挥部发布,11月10日0时至24时,大连市新增21例本土新冠肺炎确诊...

纤维素制成闪光材料无毒可降解 或彻底改变化妆品行业

  纤维素制成闪光材料无毒可降解  或彻底改变化妆品行业  科技日报北京11月11日电 (实习记者张...

新冠疫情期间 海洋中或堆积2.8万吨相关塑料废物

  海洋中或堆积了2 8万吨新冠废物  科技日报北京11月11日电 (记者刘霞)据美国趣味科学网站10日报...

开屏广告又现新花招,换个马甲就重来?

  开屏广告又现新花招,换个马甲就重来?  ■ 来论  据媒体报道,“双十一”期间,一些App的开屏...

对不合理教师资格认定标准,该全面清理了

  对不合理教师资格认定标准,该全面清理了  ■ 来论  针对网友留言反映的“糖尿病无法通过教师...

虚假宣传、以次充好 直播间商家“放水”让消费者闹心

  虚假宣传、以次充好、售卖临期产品不提示直播间商家“放水”让消费者闹心  关注“双11”  今年...

“扫码抽手机”实则是广告 快递单广告是谁发的?

  “扫码抽手机”实则是广告  快递单能“领红包”面单广告是谁发的?  “双11”之际,消费者被商...

北京市本起疫情部分确诊病例病毒全基因组测序已完成

  中新网11月12日电 据北京市疾病预防控制中心微信公众号消息,2021年11月10日北京市接报1例在京存在...

辽宁大连本轮疫情病毒为德尔塔变异株 24个区域划定为中风险地区

  (抗击新冠肺炎)辽宁大连本轮疫情病毒为德尔塔变异株 24个区域划定为中风险地区  中新社大连11月1...

西安市鄠邑区太平峪首次发现野生川金丝猴踪迹

  中新网西安11月11日电 (梅镱泷 杨起超)记者11日从西安市鄠邑区秦保局获悉,太平国有生态林场架设...

成都停业整顿56家零售药店 买感冒药需提供身份证

  (抗击新冠肺炎)成都停业整顿56家零售药店 买感冒药需提供身份证  中新网成都11月11日电 (记者 ...

大连市启动第二轮全员核酸检测

  中新网大连11月11日电 (记者 杨毅) 11月11日,大连市政府秘书长衣庆焘在大连疫情防控新闻发布会...

浙江衢州发现的西周墓葬群或为“姑蔑王陵”

  新华社杭州11月11日电(记者冯源)在商周时期,如今的浙江中西部活跃着一个名为“姑蔑”的族群,但是...

大连本轮疫情为德尔塔变异株

  中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...

大连:本轮疫情病毒载量高、传染性强 代际传播为2天左右

  中新网大连11月11日电 (记者 杨毅)11月11日,大连市政府秘书长衣庆焘在大连疫情防控工作新闻发布...

25名干警获云南“最美政法干警”表彰

  中新网昆明11月11日电(记者 缪超)云南“最美政法干警”发布仪式11日在昆明举行。会上,授予昆明市...

甘肃凝聚“她力量”:互助抗疫,女人更懂女人心

  (抗击新冠肺炎)甘肃凝聚“她力量”:互助抗疫,女人更懂女人心  中新网兰州11月11日电 (记者 徐...

甘肃戈壁的“明星劳模”:自编自导让每一个工人唱“主角”

  中新网兰州11月11日电 (史静静)在甘肃金川公司,27年来葛小海始终在生产一线,他参与的“渣罐车制...

跨越千山万水 浙江爱心“温暖”新疆沙雅孩子

  中新网乌鲁木齐11月11日电 (王小军 罗宣政 廖超)11月11日,一批来自浙江嘉兴的爱心物资,跨越...

“钢轨神探”启5大铁路大动脉9300公里冬季“诊疗”

  中新网兰州11月11日电 (邬凡 朱学成)11月10日5时30分,位于敦煌车站旁的敦煌综合工区,钢轨探伤车...

重庆把接种点“搬”进小学,8岁学生:“我不怕疼,打疫苗是为抵抗病毒”

  中新网重庆11月11日电 (梁钦卿)“我今年上小学三年级了,我不怕疼,打疫苗是为了抵抗新冠病毒。”1...

四川绵阳:一男子酒后报警谎称自己是“黄码”被行拘10天

  中新网绵阳11月11日电 (岳波 李远梅)四川绵阳警方11日通报称,一男子酒后无聊多次报警称自己的...

民航局对多个入境航班发出熔断指令

  中新网11月11日电 据中国民航局网站消息,11月11日,民航局再发熔断指令,对德国汉莎航空公司LH728...

疫情中轮椅上的“逆行者”:想为大家做力所能及的事

  中新网成都11月11日电 题:疫情中轮椅上的“逆行者”:想为大家做力所能及的事  作者 祝欢  ...

上海警方:“三无产品”借短视频平台引流诈骗近千名受害人

  中新网上海11月11日电 (记者 李姝徵)上海警方11日召开发布会披露,在近期“砺剑”行动中破获了一...

山西搭建交通事故重伤员无差别急救绿色通道 累计救助7727个受困家庭

  中新网太原11月11日电 (记者 李庭耀)记者11日从山西省政府新闻办举行的新闻发布会上获悉,山西推...

三万余尾“水中大熊猫”放流新疆博斯腾湖(图)

  中新网乌鲁木齐11月11日电 (刘雨珊 牛雨萌 艾尼)11日,记者从新疆水产科研所获悉,新疆博湖县将...

大连新增11个中风险地区

  中新网大连11月11日电 (记者 杨毅)大连市新冠肺炎疫情防控总指挥部 11日发布公告,大连市将庄河...

陕西科技大学开设“酿酒课” 培养学生知行合一

  中新网西安11月11日电 (记者 党田野)身穿白色“礼服”,摇晃着酒杯,时不时浅酌一口啤酒,然后与...

辽宁大连:今日新增十一个中风险地区

  11月11日大连市新冠肺炎疫情防控总指挥部发布,按照国务院应对新冠肺炎疫情联防联控机制关于科学划...

“双十一”南京的猪都“脱单”了?其实还有更让人嫉妒的……

  中新网南京11月11日电 题:这个“双十一”南京的猪都“脱单”了 其实还有更让人嫉妒的……  ...

内蒙古警方悬赏20万缉凶扎兰屯重大刑事案嫌疑人

  中新网呼伦贝尔11月11日电 (记者 张林虎)11日,记者从内蒙古自治区呼伦贝尔市公安局获悉,该局将...

“帅府主题系列甜品”发布:孙中山喜食“牛奶煮苹果” 宋庆龄爱吃蛋糕

  中新网广州11月11日电 (记者 程景伟)“寻味帅府邂逅甜蜜——2021年帅府之夜”暨“海外拾珠——孙...

江苏徐州“家门口车管所”便民服务驶入“高速路”

  中新网徐州11月11日电 题:江苏徐州“家门口车管所”便民服务驶入“高速路”  作者 朱志庚 ...

重庆市发出今年秋冬季首次空气污染应对工作预警

  中新网重庆11月11日电 (梁钦卿)为加强秋冬季空气污染应对,重庆市生态环境局11日发出2021年第九次...

全国道德模范|深藏功名三十载 化作春蚕报乡亲——一名抗美援朝老兵的人生选择

  新华社重庆11月11日电 题:深藏功名三十载 化作春蚕报乡亲——一名抗美援朝老兵的人生选择  新...

湖南桃江“花痴”男子盗窃六盆盆栽落法网

  中新网益阳11月11日电 (王鹏 王庆庆)爱花本是修身养性、陶冶情操之事,湖南益阳市桃江县桃花江镇...

成都25位民辅警的“封闭”生活:有人“转行”送外卖 有人变身“仓鼠管理员”

  中新网成都11月11日电 题:成都25位民辅警的“封闭”生活:有人“转行”送外卖 有人变身“仓鼠管...

2022年江苏中小学:体育教师师生比将达1:220

  中新网南京11月11日电 (徐珊珊)江苏省教育厅体育卫生与艺术教育处处长张鲤鲤11日在南京表示,到202...

国内首次!成都发布公园城市银杏观叶指数

  中新网成都11月11日电 (记者 吕杨)成都市公园城市建设管理局11日正式发布公园城市银杏观叶指数,...

江苏学生体质健康监测报告:“小眼镜”“小胖墩”问题突出

  中新网南京11月11日电 (徐珊珊)11日,江苏省教育厅召开新闻发布会,发布2020年江苏省学生体质健康...

被拐17年 湖北宜昌警方助男子与家人团圆

  中新网宜昌11月11日电 (江雅丽 董晓斌)17年前,四川广安一夫妇的6岁儿子被人拐走,夫妻俩寻找多年...

福建柘荣发现1例境外输入复阳病例 四名密接者核酸检测为阴性

  中新网宁德11月11日电 (林榕生)福建宁德市柘荣县应对新型冠状病毒感染肺炎疫情工作领导小组(指挥部...

内蒙古额济纳旗达来呼布镇调整为低风险地区

  11月11日,内蒙古额济纳旗新冠肺炎防控工作指挥部发布《关于调整额济纳旗达来呼布镇风险等级的公告...

内蒙古现有本土确诊病例32例 伊金霍洛旗确诊病例清零

  (抗击新冠肺炎)内蒙古现有本土确诊病例32例 伊金霍洛旗确诊病例清零  中新网呼和浩特11月11日电...

交通陆续恢复 内蒙古通辽生产生活秩序逐步恢复正常

  中新网呼和浩特11月11日电 (记者 张林虎)11日,记者从内蒙古自治区通辽市相关部门获悉,从10日下...

X 广告
X 广告

精彩放送

选矿工技师的创新路:从废旧场“寻宝”到多技术攻关

海外传播官体验宁波“十里红妆” 赞其“让人震撼”

山西中小学幼儿园基本实现“4个100%”达标建设

走近张家界“奇峰守护者”

新华全媒+丨黑龙江黑河:他们在寒冬中战“疫”

山东日照第一批五莲县外解除隔离人员返家 将继续进行7天居家健康监测

中央气象台:未来几天我国大部天气将趋稳 东北雨雪进入尾声

云南西双版纳:巾帼护村队化身边境村寨守护者

乘客自发合力推车 “119”为“120”抢出一条急救通道

中老联合考古队探索青铜冶炼遗址奥秘 结下“深厚情谊”

黑龙江:抢修人员彻夜冒雪“破冰” 高铁列车逐步恢复运行

内蒙古通辽:交通陆续恢复 生产生活秩序逐步恢复正常

中国石油在京系统全面进入应急状态

记者手记:采访他们10分钟,我的手都快冻僵了

云南瑞丽畹町镇国防街片区调整为中风险地区

女儿与父母失散18年 黔滇两地警方携手让爱“团圆”

北京朝阳新增2名确诊病例 152名密接均已管控

听冬奥制服设计师贺阳讲述“水墨长城”灵感从何而来

纸短情长:一封道歉信“温暖”一座小城

能订货还给免费代购 便民菜车如何办成“五年老店”

买的床收货变清洁膏 小心收钱不发货的“跑路”电商

北京:在京单位加强会议活动管理 提倡视频开会

调查显示甘肃民众对战疫有信心

手机APP自动续费这个“坑”该咋填?专家支招

奔跑在无声世界的“李慢跑”:不走捷径才是最快的捷径

北京海淀1地升为中风险 确诊病例轨迹涉地铁、小学

“疫”下城市的邻里生活,不再是“相顾无言”

未售出“空置房”需交物业费吗?法院判决开发商应按合同支付

甘肃妇联发挥“联”字优势 近20万女性化身抗疫志愿者

双十一快件“爆仓”来袭 投递服务成“投诉大户”

辽宁:雪灾致640.50万头(只)畜禽死亡

中国青少年抑郁检出率超2成 学生健康体检要筛查!

清华贫困生“树洞里”装着励志之外的三重价值

我国境内首宗个人破产清算案裁定

夫妻档盗版黑手伸向《扫黑风暴》 警方:莫把盗版视商机

深圳“医保药价通”上线 医保药品价格一键可查

29岁父亲“割肝救女” 8月龄“小黄人”重获新生

452支团队参加第二届海峡两岸(福州)职工创新创业创造大赛

守护孤残儿童25年 她为2000多个孩子撑起一片天

“双十一”来临 快递强市浙江金华加强“人物并防”

江苏实施全生命周期监管将危废源头“一网打尽”

不拘一格降人才 摘下有色眼镜看“第一学历”

确诊患者曾就诊 北京东直门医院东城院区和国际部停诊

云南10日新增2例本土确诊病例

大数据分析表明昆虫多样性与碳排放密切关联

将传统“家事”上升为重要“国事” 筑好育人的起点和基点

档案人才培养现状:精通全流程的复合型人才较紧缺

“献身航空事业,不是一阵子,而是一辈子”

进博会:05后走上台前【最年轻“小叶子”仅16岁】

高山峡谷里的“背包队”再出发

Copyright ©  2015-2022 太平洋律师网版权所有  备案号:豫ICP备2022016495号-17   联系邮箱:93 96 74 66 9@qq.com