- 1、本文档共11页,可阅读全部内容。
- 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
- 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载。
- 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件
大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件
1. 前言
Executor是如何获取到Shuffle的数据文件进行Action的算子的计算呢?在ResultTask中,Executor通过MapOutPutTracker向Driver获取了ShuffID的Shuffle数据块的结构,整理成以BlockManangerId为Key的结构,这样可以更容易区分究竟是本地的Shuffle还是远端executor的Shuffle
2. Fetch数据
在MapOutputTracker中获取到的BlockID的地址,是以BlockManagerId的seq数组
[plain] view plain copy
Seq[(BlockManagerId, Seq[(BlockId, Long)])]
BlockManagerId结构
[plain] view plain copy
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
private var topologyInfo_ : Option[String])
extends Externalizable
是以ExecutorId,Executor Host IP, Executor Port 标示从哪个Executor获取Shuffle的数据文件,通过Seq[BlockManagerId, Seq(BlockID,Long)]的结构,当前executor很容易区分究竟哪些是本地的数据文件,哪些是远端的数据,本地的数据可以直接本地读取,而需要不通过网络来获取。
2.1 读取本Executor文件
如何认为是本地数据?
Spark认为区分是通过相同的ExecutorId来区别的,如果ExecutorId和自己的ExecutorId相同,认为是本地Local,可以直接读取文件。
[plain] view plain copy
for ((address, blockInfos) - blocksByAddress) {
totalBlocks += blockInfos.size
if (address.executorId == blockManager.blockManagerId.executorId) {
// Filter out zero-sized blocks
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
numBlocksToFetch += localBlocks.size
}
}
这里有两种情况:
同一个Executor会生成多个Task,单个Executor里的Task运行可以直接获取本地文件,不需要通过网络
同一台机器多个Executor,在这种情况下,不同的Executor获取相同机器下的其他的Executor的文件,需要通过网络
2.2 读取非本Executor文件
2.2.1 构造FetchRequest请求
获取非本Executor的文件,在Spark里会生成一个FetchRequest,为了避免单个Executor的MapId过多发送多个FetchRequest请求,会合并同一个Executor的多个请求,合并的规则由最大的请求参数控制
[plain] view plain copy
spark.reducer.maxSizeInFlight
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
对同一个Executor,如果请求多个Block请求的数据大小未超过targetRequestSize,将会被分配到同一个FetchRequest中,以避免多次FetchRequest的请求
[plain] view plain copy
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
while (iterator.hasNext) {
val (blockId, size) = iterator.next(
您可能关注的文档
- 大学英语综合教程翻译课文句子.doc
- 大学英语网测2.doc
- 大学英语综合教程新理念第一册课件Unit8.ppt
- 大学英语综合教程新理念第一册课件Unit9.ppt
- 大学英语翻译技巧(新2-).ppt
- 大学英语考试精读:第五册(UNIT6).doc
- 大学英语考试精读:第六册(UNIT7).doc
- 大学英语翻译教程-第八讲.ppt
- 大学英语英国文学.ppt
- 大学英语翻译教程-第五讲.ppt
- 甘肃省酒泉市金塔县等4地2024-2025学年高一上学期11月期中考试数学试题【含解析】.pdf
- 浙江省宁波市余姚中学2024-2025学年度高二上学期10月月考数学试题【含解析】.docx
- 河南省商开大联考2022-2023学年高一上学期期中考试数学试卷【含解析】.pdf
- 重庆市开州中学2024-2025学年高二上学期第一次月考数学试题【含解析】.docx
- 云南省昆明仁泽中学2024-2025学年度高二上学期10月月考数学试卷【含解析】.docx
- 《环境文本数据加工处理技术规范》.pdf
- 《区域大气环境承载力监测预警技术规范》.pdf
- 《环境空气温室气体光声光谱法连续自动监测仪技术要求及检测方法》.pdf
- 《美丽城市建设数据分级分类规范》.pdf
- 《大数据优化区域空气质量模拟排放输入数据技术规范》.pdf
文档评论(0)