大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件.doc

大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件.doc

  1. 1、本文档共11页,可阅读全部内容。
  2. 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
  3. 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载
  4. 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件

大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件 1. 前言 Executor是如何获取到Shuffle的数据文件进行Action的算子的计算呢?在ResultTask中,Executor通过MapOutPutTracker向Driver获取了ShuffID的Shuffle数据块的结构,整理成以BlockManangerId为Key的结构,这样可以更容易区分究竟是本地的Shuffle还是远端executor的Shuffle 2. Fetch数据 在MapOutputTracker中获取到的BlockID的地址,是以BlockManagerId的seq数组 [plain] view plain copy Seq[(BlockManagerId, Seq[(BlockId, Long)])] BlockManagerId结构 [plain] view plain copy class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int, private var topologyInfo_ : Option[String]) extends Externalizable 是以ExecutorId,Executor Host IP, Executor Port 标示从哪个Executor获取Shuffle的数据文件,通过Seq[BlockManagerId, Seq(BlockID,Long)]的结构,当前executor很容易区分究竟哪些是本地的数据文件,哪些是远端的数据,本地的数据可以直接本地读取,而需要不通过网络来获取。 2.1 读取本Executor文件 如何认为是本地数据? Spark认为区分是通过相同的ExecutorId来区别的,如果ExecutorId和自己的ExecutorId相同,认为是本地Local,可以直接读取文件。 [plain] view plain copy for ((address, blockInfos) - blocksByAddress) { totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { // Filter out zero-sized blocks localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) numBlocksToFetch += localBlocks.size } } 这里有两种情况: 同一个Executor会生成多个Task,单个Executor里的Task运行可以直接获取本地文件,不需要通过网络 同一台机器多个Executor,在这种情况下,不同的Executor获取相同机器下的其他的Executor的文件,需要通过网络 2.2 读取非本Executor文件 2.2.1 构造FetchRequest请求 获取非本Executor的文件,在Spark里会生成一个FetchRequest,为了避免单个Executor的MapId过多发送多个FetchRequest请求,会合并同一个Executor的多个请求,合并的规则由最大的请求参数控制 [plain] view plain copy spark.reducer.maxSizeInFlight val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) 对同一个Executor,如果请求多个Block请求的数据大小未超过targetRequestSize,将会被分配到同一个FetchRequest中,以避免多次FetchRequest的请求 [plain] view plain copy val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] while (iterator.hasNext) { val (blockId, size) = iterator.next(

文档评论(0)

dajuhyy + 关注
实名认证
内容提供者

该用户很懒,什么也没介绍

1亿VIP精品文档

相关文档