第12课:Spark Streaming源码解读之Execu
admin
2023-01-31 12:07:14
0

Receiver接收到的数据交由ReceiverSupervisorImpl来管理。

ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTracker 。

Executor的数据容错可以有三种方式:

  1. WAL日志

  2. 数据副本

  3. 接收receiver的数据流回放

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

数据的存储,是借助receiverBlockHandler,它的实现有两种方式:

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    if (checkpointDirOption.isEmpty) {
      throw new SparkException(
        "Cannot enable receiver write-ahead log without checkpoint directory set. " +
          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
          "See documentation for more details.")
    }
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}


WriteAheadLogBaseBlockHandler 一方面将数据交由BlockManager管理,另一方面会写WAL日志。

一旦节点崩溃,可以由WAL日志恢复内存中的数据。在WAL开始时,就不在建议数据存储多个副本。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}


而BlockManagerBaseBlockHandler直接将数据交由BlockManager管理。

如果不写WAL,当节点崩溃了一定会数据丢失吗? 这个也不一定。因为在构建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的时候会将receiver的storageLevel传入。storageLevel用来描述数据保存的地方(内存、磁盘)以及副本个数。

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable

公有如下种类的StorageLevel:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)


默认情况,数据采用MEMORY_AND_DISK_2,也就是说数据会产生两个副本,并且内存不足时会写入磁盘。


数据的最终存储是由BlockManager完成并管理的:

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

  var numRecords = None: Option[Long]

  val putResult: Seq[(BlockId, BlockStatus)] = block match {
    case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
        tellMaster = true)
    case IteratorBlock(iterator) =>
      val countIterator = new CountingIterator(iterator)
      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
        tellMaster = true)
      numRecords = countIterator.count
      putResult
    case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
    case o =>
      throw new SparkException(
        s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
  if (!putResult.map { _._1 }.contains(blockId)) {
    throw new SparkException(
      s"Could not store $blockId to block manager with storage level $storageLevel")
  }
  BlockManagerBasedStoreResult(blockId, numRecords)
}


对于从kafka中直接读取数据,可以通过记录数据offset的方法来进行容错。如果程序崩溃,下次启动时,从上次未处理数据的offset再次读取数据即可。



备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


相关内容

热门资讯

重磅消息“大庆划水麻将.到底有... 重磅消息“大庆划水麻将.到底有挂吗?”其实是有挂您好,大庆划水麻将这个游戏其实有挂的,确实是有挂的,...
终于了解“十胡卡.开挂器?”其... 网上科普关于“十胡卡有没有挂”话题很是火热,小编也是针对十胡卡作*弊开挂的方法以及开挂对应的知识点,...
【第一资讯】“桃花庄.怎么装挂... 有 亲,根据资深记者爆料桃花庄是可以开挂的,确实有挂(咨询软件无需打开直...
今日重大通报“火神牛牛.怎么开... 网上科普关于“火神牛牛有没有挂”话题很是火热,小编也是针对火神牛牛作*弊开挂的方法以及开挂对应的知识...
【第一消息】“约战丹东麻将.可... 网上科普关于“约战丹东麻将有没有挂”话题很是火热,小编也是针对约战丹东麻将作*弊开挂的方法以及开挂对...
玩家攻略科普“新绍兴麻将.究竟... 您好:新绍兴麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【9752949】很多玩家在这款游...
俞敏洪发声再谈董宇辉离职:比较... 据看看新闻报道,12月21日,俞敏洪在讲话中谈及董宇辉离职,俞敏洪仍亲切地喊他“孩子”,表示对目前结...
美重建加沙计划被批“幻灯片项目... 【环球时报特约记者 梁由之】据《以色列时报》20日报道,美国政府正推出一项名为“日出计划”的投资设想...
今日重大通报“微信十三水.是不... 今日重大通报“微信十三水.是不是有挂?”透视曝光猫腻您好,微信十三水这个游戏其实有挂的,确实是有挂的...
今日重大通报“传送屋激k.究竟... 家人们!今天小编来为大家解答传送屋激k透视挂怎么安装这个问题咨询软件客服徽4282891的挂在哪里买...