博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《深入理解Spark:核心思想与源码分析》——3.11节ContextCleaner的创建与启动
阅读量:6980 次
发布时间:2019-06-27

本文共 1791 字,大约阅读时间需要 5 分钟。

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.11节ContextCleaner的创建与启动,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.11 ContextCleaner的创建与启动

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner: Option[ContextCleaner] = {    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {        Some(new ContextCleaner(this))    } else {        None    }}cleaner.foreach(_.start())``ContextCleaner的组成如下:referenceQueue:缓存顶级的AnyRef引用;referenceBuffer:缓存AnyRef的虚引用;listeners:缓存清理工作的监听器数组;cleaningThread:用于具体清理工作的线程。ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。代码清单3-48 keep Cleaning的实现

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

while (!stopped) {    try {        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))            .map(_.asInstanceOf[CleanupTaskWeakReference])        // Synchronize here to avoid being interrupted on stop()        synchronized {            reference.map(_.task).foreach { task =>            logDebug("Got cleaning task " + task)            referenceBuffer -= reference.get            task match {                case CleanRDD(rddId) =>                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)                case CleanShuffle(shuffleId) =>                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)                case CleanBroadcast(broadcastId) =>                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)                }            }        }    } catch {        case ie: InterruptedException if stopped => // ignore        case e: Exception => logError("Error in cleaning thread", e)    }}

}

转载地址:http://neypl.baihongyu.com/

你可能感兴趣的文章
CI报Disallowed Key Characters的解决
查看>>
关于手机已处理里重复单据的处理办法
查看>>
IIS 7.5 + FastCGI + PHP + Drupal 7 + Oracle
查看>>
我的友情链接
查看>>
英文版PDF不能显示中文PDF文件的解决方法
查看>>
linux 内核 出错-HP 方案
查看>>
Linux
查看>>
HashSet的使用
查看>>
WSFC 仲裁模型选择
查看>>
nginx安装 问题 1
查看>>
MST配置详解
查看>>
linux下用phpize给PHP动态添加扩展
查看>>
任意排列、组合终极Shell脚本
查看>>
★核心关注点_《信息系统项目管理师考试考点分析与真题详解》
查看>>
Go处理百万每分钟的请求
查看>>
你以为自己在填验证码,其实你是在给Google义务劳动
查看>>
linux实战考试题:批量创建用户和密码(不能使用循环)
查看>>
一个基于J2EE的web应用程序运行起来需要什么?
查看>>
Docker配置指南系列(二):指令集(二)
查看>>
nginx 开发一个简单的 HTTP 模块
查看>>