本文共 1791 字,大约阅读时间需要 5 分钟。
本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.11节ContextCleaner的创建与启动,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.11 ContextCleaner的创建与启动
ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None }}cleaner.foreach(_.start())``ContextCleaner的组成如下:referenceQueue:缓存顶级的AnyRef引用;referenceBuffer:缓存AnyRef的虚引用;listeners:缓存清理工作的监听器数组;cleaningThread:用于具体清理工作的线程。ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。代码清单3-48 keep Cleaning的实现
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) // Synchronize here to avoid being interrupted on stop() synchronized { reference.map(_.task).foreach { task => logDebug("Got cleaning task " + task) referenceBuffer -= reference.get task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } } } } catch { case ie: InterruptedException if stopped => // ignore case e: Exception => logError("Error in cleaning thread", e) }}
}
转载地址:http://neypl.baihongyu.com/