- 1、本文档共13页,可阅读全部内容。
- 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
- 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载。
- 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
Spark Stre的aming应用启动过程分析
Spark Streaming应用启动过程分析
在调用StreamingContext.start方法后,进入JobScheduler.start方法中,各子元素start方法的调用顺序如下:
private var eventLoop : EventLoop[JobSchedulerEvent] = null
val listenerBus = new StreamingListenerBus()
private val jobGenerator = new JobGenerator(this)
eventLoop.start
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
在eventLoop, listenerBus以及jobGenerator中都维持了一个事件队列,以多线程的形式从这些队列中取出事件并处理。一般来说,运行一个生产者消费者系统时, 往往先开始运行的是消费者。所以在上面的代码中,越是早start的对象,越不是Spark Streaming启动事件的入口。理解了这段话对于理解后续的启动过程分析是有帮助的。无法理解的话也可以先理解后续分析再回头想想这一点。
接下来分析上图中的主要对象。
一、JobGenerator类
JobGenerator的构造方法如下,使用到了前面提到的JobScheduler对象。
class JobGenerator(jobScheduler: JobScheduler) extends Logging
1
1
进入JobGenerator类。可以看到其start方法与JobScheduler的start方法结构十分类似。在这里面也有一个EventLoop类型的eventLoop对象,只不过这个对象传入的是JobGeneratorEvent类型的事件。
eventLoop = new EventLoop[JobGeneratorEvent](JobGenerator) {
override protected def onReceive (event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError (e: Throwable ): Unit = {
jobScheduler.reportError(Error in job generator , e)
}
}
eventLoop.start()
1、eventLoop处理事件
看一眼JobGeneratorEvent,发现JobGenerator中的eventLoop主要处理的是Job生成,metadata以及checkpoint相关的事件。
private[scheduler] sealed trait JobGeneratorEvent
// 生成Jobs
private [scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
// 清除metadata
private [scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
// 设置checkpoint
private [scheduler] case class DoCheckpoint(
time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
// 清除checkpoint数据
private [scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent
当JobGeneratorEvent对象开始执行时,会多线程启动eventLoop对象通过执行JobGcessEvent方法处理JobGenerator事件。
看一下JobGcessEvent方法中调用的JobGenerator.generateJobs方法是如何处理GenerateJobs事件的。
private def generateJobs (time: Time) {
Try {
文档评论(0)