网站大量收购闲置独家精品文档,联系QQ:2885784924

Spark Stre的aming应用启动过程分析.doc

  1. 1、本文档共13页,可阅读全部内容。
  2. 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
  3. 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载
  4. 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
Spark Stre的aming应用启动过程分析

Spark Streaming应用启动过程分析 在调用StreamingContext.start方法后,进入JobScheduler.start方法中,各子元素start方法的调用顺序如下: private var eventLoop : EventLoop[JobSchedulerEvent] = null val listenerBus = new StreamingListenerBus() private val jobGenerator = new JobGenerator(this) eventLoop.start listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start()      在eventLoop, listenerBus以及jobGenerator中都维持了一个事件队列,以多线程的形式从这些队列中取出事件并处理。一般来说,运行一个生产者消费者系统时, 往往先开始运行的是消费者。所以在上面的代码中,越是早start的对象,越不是Spark Streaming启动事件的入口。理解了这段话对于理解后续的启动过程分析是有帮助的。无法理解的话也可以先理解后续分析再回头想想这一点。   接下来分析上图中的主要对象。 一、JobGenerator类   JobGenerator的构造方法如下,使用到了前面提到的JobScheduler对象。 class JobGenerator(jobScheduler: JobScheduler) extends Logging 1 1   进入JobGenerator类。可以看到其start方法与JobScheduler的start方法结构十分类似。在这里面也有一个EventLoop类型的eventLoop对象,只不过这个对象传入的是JobGeneratorEvent类型的事件。 eventLoop = new EventLoop[JobGeneratorEvent](JobGenerator) { override protected def onReceive (event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError (e: Throwable ): Unit = { jobScheduler.reportError(Error in job generator , e) } } eventLoop.start() 1、eventLoop处理事件   看一眼JobGeneratorEvent,发现JobGenerator中的eventLoop主要处理的是Job生成,metadata以及checkpoint相关的事件。 private[scheduler] sealed trait JobGeneratorEvent // 生成Jobs private [scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent // 清除metadata private [scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent // 设置checkpoint private [scheduler] case class DoCheckpoint( time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent // 清除checkpoint数据 private [scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent   当JobGeneratorEvent对象开始执行时,会多线程启动eventLoop对象通过执行JobGcessEvent方法处理JobGenerator事件。   看一下JobGcessEvent方法中调用的JobGenerator.generateJobs方法是如何处理GenerateJobs事件的。 private def generateJobs (time: Time) { Try {

文档评论(0)

liwenhua00 + 关注
实名认证
内容提供者

该用户很懒,什么也没介绍

1亿VIP精品文档

相关文档