(版本定制)第11课:Spark Streaming源码解读
admin
2023-01-31 11:48:21
0

本期内容:

    1、ReceiverTracker的架构设计

    2、消息循环系统

    3、ReceiverTracker具体实现


上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。

ReceiverTracker主要的功能:

  1. 在Executor上启动Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收数据的速度(也就是限流)

  4. 不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver,也就是Receiver的容错功能。

  5. 接受Receiver的注册。

  6. 借助ReceivedBlockTracker来管理Receiver接收数据的元数据。

  7. 汇报Receiver发送过来的错误信息


ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。

在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers。

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。

Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker。

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
  ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据

case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
          context.reply(addBlock(receivedBlockInfo)) 
        } else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }

数据的元数据是交由ReceivedBlockTracker管理的

数据最终被写入到streamIdToUnallocatedBlockQueues中,一个流对应一个数据块信息的队列。

每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。

下面是简单的流程图:

(版本定制)第11课:Spark Streaming源码解读

相关内容

热门资讯

今日重大通报“微乐安庆麻将.有... 网上科普关于“微乐安庆麻将有没有挂”话题很是火热,小编也是针对微乐安庆麻将作*弊开挂的方法以及开挂对...
多家优衣库门店,被曝偷拍 近日,优衣库部分门店被指“偷拍”消费者来防盗。在社交媒体上,部分消费者反映,在优衣库因“被当小偷”引...
玩家最新攻略“红中麻将.怎么开... 有 亲,根据资深记者爆料红中麻将是可以开挂的,确实有挂(咨询软件无需打开...
最新引进“授权大厅炸/金/花.... 最新引进“授权大厅炸/金/花.辅助器?”果然有透视挂您好,授权大厅炸/金/花这个游戏其实有挂的,确实...
现货黄金首次站上4400美元/... 【大河财立方消息】 北京时间12月22日午间,现货黄金首次站上4400美元/盎司,今年以来累涨近68...
今日重大消息“同城游贵阳捉鸡麻... 家人们!今天小编来为大家解答同城游贵阳捉鸡麻将透视挂怎么安装这个问题咨询软件客服徽4282891的挂...
MiniMax通过港交所聆讯,... 2025年12月21日晚,国内大模型公司稀宇科技MiniMax(以下简称:MiniMax)在港交所公...
科创合肥,亮出新招 作 者:风铃 来 源:正和岛(ID:zhenghedao) 最近这一周,合肥在科创圈内频频刷屏。 具...
终于明白“花花生活圈.是不是有... 有 亲,根据资深记者爆料花花生活圈是可以开挂的,确实有挂(咨询软件无需打...
重磅消息“一喜棋牌.可以开挂吗... 网上科普关于“一喜棋牌有没有挂”话题很是火热,小编也是针对一喜棋牌作*弊开挂的方法以及开挂对应的知识...