ReceiverTracker怎么实现

发布时间:2025-11-02 点击:9
本篇内容主要讲解“receivertracker怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“receivertracker怎么实现”吧!
receivertacker类如下,从源码注释可以看出该类的作用。
管理receiverinputdstreams的执行,记录receiver发来的元数据信息。receivertacker类构造时必须传入streamingcontext对象。
receivertacker类内部有receivertackerendpoint这个消息通信体,用于和运行在executor端的receiversupervisorimpl进行通信,包括receiver的注册,重启receiver,清除之前的block数据,更新限流值,添加block元数据信息等消息。
接下来以接收到来自executor端的receiversupervisorimpl发来添加元数据信息的addblock消息,进行讲解具体的处理流程。
receivedblockinfo类包含了streamid,block中记录条数,元数据metadata,接收block的存储结果(blockid和记录数量)
receiverblocktracker类是addblock方法的具体实现。
1.调用receiverblocktracker的writetolog方法
2.调用receiverblocktracker的getreceivedblockqueue方法,其中streamidtounallocatedblockqueues为hashmap,key为streamid,value为receivedblockqueue。而receivedblockqueue 的定义为private type receivedblockqueue = mutable.queue[receivedblockinfo]
receiverblocktracker类,可以从源码中看出,他会记录所有接收到的block信息,根据需要把block分配给batch。如果设置了checkpoint,开启wal,则会把所有的操作保存到预写日志中,因此当driver失败后就可以从checkpoint和wal中恢复receivertracker的状态。
receiverblocktracker类中重要的方法,allocateblockstobatch。private val timetoallocatedblocks = new mutable.hashmap[time, allocatedblocks]存储批处理时刻,分配到的blocks数据。
该方法是被receivertracker调用的。
而receivertracker的allocateblockstobatch方法是被jobgenerator的generatejobs方法调用的。
receiverblocktracker类中重要的方法,getblocksofbatch。
该方法是被receivertracker的getblocksofbatch调用。
receivertracker的getblocksofbatch方法是被receiverinputdstream的compute方法调用的。
到此,相信大家对“receivertracker怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是箭头云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

word2010怎么删除空白页_word2010怎么删除不要的空白页
阿里云域名备案服务器信息修改失败
请问现在不能提交备案吗-备案平台
无线互联网广告盈利模式盘点
ecs阿里云服务器能干嘛
虚拟主机选择什么系统最好?用户选择虚拟主机几大技巧须知
免费域名注册哪里申请?
备案过程中网站的虚拟主机里需要放置内容吗