你好,我是吴磊。
一提到 Shuffle,你能想到什么?我想很多人的第一反应都是应用中最顽固、最难解决的性能瓶颈。
在之前的课程中,我们也不止一次地提到 Shuffle,尤其是在开发原则那一讲,我还建议你遵循“能省则省、能拖则拖”的原则,在应用中尽量去避免 Shuffle,如果受业务逻辑所限确实不能避免,就尽可能地把 Shuffle 往后拖。
那么,为什么我们一谈 Shuffle 就色变,提到它就避之唯恐不及呢?今天这一讲,我就通过实现一个仙女散花游戏的过程,来和你深入探讨 Shuffle 是如何工作的,说说它为什么是分布式应用一时无两的性能杀手。毕竟,只有了解 Shuffle 的工作原理,我们才能更好地避免它。
假设,你的老板今天给你安排一个开发任务,让你用 Spark 去实现一个游戏需求。这个实现需求来自一个小故事:仙女散花。
很久以前,燕山脚下有个小村庄,村子里有所“七色光”小学,方圆百里的孩子都来这里上学。这一天,一年级 2 班的黄老师和班里的五个孩子正在做一个游戏,叫做“仙女散花”。
黄老师的背包里装满了五种不同颜色的花朵,五种颜色分别是红、橙、黄、紫、青。她把背包里的花朵随机地分发给五个小同学:小红、橙橙、黄小乙、阿紫和小青。花朵发完之后,每个同学分到的花朵数量差不多,颜色各有不同。
接着,黄老师开始宣布游戏规则:“你们要一起协作,在最短的时间内,把花朵按照颜色收集在一起。游戏完成的标志是,课桌上有 5 堆花朵,每堆花朵的颜色都是一样的。”
大家跃跃欲试,黄小乙说:“先别急,我们来制定个策略。先在前面摆上 5 张课桌,然后每个人需要做两件事情,先把自己手里的花朵按照颜色归类分为 5 堆,再把分好颜色的花朵,分别放到相应的课桌上!”于是,几个小同学按照黄小乙说的策略,不一会儿就完成了游戏。
事实上,仙女散花的游戏过程和 Shuffle 的工作流程大同小异。当然,Shuffle 过程中涉及的步骤和环节,要比“仙女散花”复杂一些。
Shuffle 的本意是“洗牌”,在分布式计算环境中,它有两个阶段。一般来说,前一个阶段叫做“Map 阶段”,后一个阶段叫做“Reduce 阶段”。当然,也有人把它们叫做 Shuffle Write 阶段和 Shuffle Read 阶段。
在仙女散花的游戏中,从老师分发花朵,到 5 个小同学把花朵按照颜色归类,对应的是 Shuffle 的 Map 阶段,而大家把归类的花朵分发到相应的课桌,这个过程类似于 Shuffle 的 Reduce 阶段。
接下来,我们就借助这个故事来深入探讨 Shuffle 的两个阶段:Map 阶段和 Reduce 阶段。
自 2.0 版本之后,Spark 将 Shuffle 操作统一交由 Sort shuffle manager 来管理。因此,今天这一讲,我们专注于 Sort shuffle manager 实现的 Shuffle 分发过程。
以终为始、以结果为导向的学习往往效率更高,在深入探讨 Map 阶段如何生产数据之前,我们不妨先来明确:Map 阶段的输出到底是什么?
之前我们也说过,Map 阶段最终生产的数据会以中间文件的形式物化到磁盘中,这些中间文件就存储在 spark.local.dir 设置的文件目录里。中间文件包含两种类型:一类是后缀为 data 的数据文件,存储的内容是 Map 阶段生产的待分发数据;另一类是后缀为 index 的索引文件,它记录的是数据文件中不同分区的偏移地址。这里的分区是指 Reduce 阶段的分区,因此,分区数量与 Reduce 阶段的并行度保持一致。
这样一来,我们就可以把问题进一步聚焦在,Spark 在 Map 阶段是如何生产这些中间文件的。不过,我们首先需要明确的是,Map 阶段每一个 Task 的执行流程都是一样的,每个 Task 最终都会生成一个数据文件和一个索引文件。因此,中间文件的数量与 Map 阶段的并行度保持一致。换句话说,有多少个 Task,Map 阶段就会生产相应数量的数据文件和索引文件。
接下来,我带你用 Spark 来实现“仙女散花”的游戏,咱们一边做游戏,一边来分析 Map 阶段的中间文件是如何产生的。
在“仙女散花”的游戏中,黄老师要求大家把同一种花色的花朵收集到一起。那么,在 Spark 的分布式开发框架内,与这个游戏最相仿的计算过程非 groupByKey 莫属,所以我们不妨用 groupByKey 来实现游戏。
首先是 flowers.txt 文件:
黄色花朵
紫色花朵
红色花朵
橙色花朵
青色花朵
黄色花朵
紫色花朵
橙色花朵
青色花朵
......
其次是同学小 A 接到需求后,用 groupByKey 实现“仙女散花”游戏的代码:
val flowers = spark.sparkContext.textFile("flowers.txt")
//黄老师给5个小同学分发花朵
val flowersForKids = flowers.coalesce(5)
val flowersKV = flowersForKids.map((_, 1))
//黄小乙的两个步骤:大家先各自按颜色归类,然后再把归类后的花朵放到相应的课桌上
flowersKV.groupByKey.collect
我们可以看到,代码步骤与游戏过程基本上一一对应。但是,读取完花朵文件之后,由于 groupByKey 是 pairRDD 算子,需要消费(Key,Value)形式的数据,因此我们需要对原始花朵数据做一次转换。以数据分区 0 为例,数据的转换过程如下图所示,你不妨把数据分区 0 理解为是黄老师分发给小红的花朵。
基于 pairRDD 的 Key,也就是花朵的颜色,Map Task 就可以计算每条数据记录在 Reduce 阶段的目标分区,目标分区也就是游戏中的课桌。在黄小乙制定的策略中,哪种花放到哪张桌子是大家事先商定好的,但在 Spark 中,每条数据记录应该分发到哪个目标分区,是由 Key 的哈希值决定的。
目标分区计算好之后,Map Task 会把每条数据记录和它的目标分区,放到一个特殊的数据结构里,这个数据结构叫做“PartitionedPairBuffer”,它本质上就是一种数组形式的缓存结构。它是怎么存储数据记录的呢?
每条数据记录都会占用数组中相邻的两个元素空间,第一个元素是(目标分区,Key),第二个元素是 Value。假设 PartitionedPairBuffer 的大小是 4,也就是最多只能存储 4 条数据记录。那么,如果我们还以数据分区 0 为例,小红的前 4 枚花朵在 PartitionedPairBuffer 中的存储状态就会如下所示。
对我们来说,最理想的情况当然是 PartitionedPairBuffer 足够大,大到足以容纳 Map Task 所需处理的所有数据。不过,现实总是很骨感,每个 Task 分到的内存空间是有限的,PartitionedPairBuffer 自然也不能保证能容纳分区中的所有数据。因此,Spark 需要一种计算机制,来保障在数据总量超出可用内存的情况下,依然能够完成计算。这种机制就是:排序、溢出、归并。
就拿大小为 4 的 PartitionedPairBuffer 来说,数据分区 0 里面有 16 朵花,对应着 16 条数据记录,它们至少要分 4 批才能依次完成处理。在处理下一批数据之前,Map Task 得先把 PartitionedPairBuffer 中已有的数据腾挪出去,腾挪的方式简单粗暴,Map Task 直接把数据溢出到磁盘中的临时文件。
不过,在溢出之前,对于 PartitionedPairBuffer 中已有的数据,Map Task 会先按照数据记录的第一个元素,也就是目标分区 + Key 进行排序。也就是说,尽管数据暂时溢出到了磁盘,但是临时文件中的数据也是有序的。
就这样,PartitionedPairBuffer 腾挪了一次又一次,数据分区 0 里面的花朵处理了一批又一批,直到所有的花朵都被处理完。分区 0 有 16 朵花,PartitionedPairBuffer 的大小是 4,因此,PartitionedPairBuffer 总共被腾挪了 3 次,生成了 3 个临时文件,每个临时文件中包含 4 条数据记录。16 条数据,有 12 条分散在 3 个文件中,还有 4 条缓存在 PartitionedPairBuffer 里。
到此为止,我们离 Map 阶段生产的、用于在网络中分发数据的中间文件仅有一步之遥了。还记得吗?Map 阶段生产的中间文件有两类,一类是数据文件,另一类是索引文件。分散在 3 个临时文件和 PartitionedPairBuffer 里的数据记录,就是生成这两类文件的输入源。最终,Map Task 用归并排序的算法,将 4 个输入源中的数据写入到数据文件和索引文件中去,如下图所示。
好了,到目前为止,我们用 groupByKey 实现了“仙女散花”的游戏,详细讲解了 Map 阶段生产中间文件的过程。虽然 Map 阶段的计算步骤很多,但其中最主要的环节可以归结为 4 步:
1. 对于分片中的数据记录,逐一计算其目标分区,并将其填充到 PartitionedPairBuffer;
2. PartitionedPairBuffer 填满后,如果分片中还有未处理的数据记录,就对 Buffer 中的数据记录按(目标分区 ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存;
3. 重复步骤 1、2,直到分片中所有的数据记录都被处理;
4. 对所有临时文件和 PartitionedPairBuffer 归并排序,最终生成数据文件和索引文件。
不难发现,仙女散花其实就是个分组、收集的游戏。应该说,用 Spark 来实现分组、收集类的游戏还是比较简单的,那么,如果把仙女散花变成是“分组、统计”的游戏,我们该怎么做呢?
5 个小同学完成游戏之后,离下课时间还早。因此,黄老师调整了游戏规则:“你们五个人还是一起协作,这次要在最短的时间内,统计不同花色花朵的数量。”
小红迫不及待地说:“很简单!还是按照刚才的策略,先把花朵分好堆,然后我们五个人分别去课桌上数数就好啦!”
黄小乙皱了皱眉,说道:“别急,新的游戏规则也是有时间限制的,我想了一个和你差不多的办法,一共分三步:第一步,每个人把手里不同花色花朵的数量先算出来;第二步,我们只需要把花朵的数量写到相应的桌子上;第三步,我们分别对五张课桌上的数字求和。这样就能完成得更快了”
如果我们想用 Spark 来实现升级后的游戏,该怎么办呢?其实很简单,只要把 groupByKey 换成 reduceByKey 就好了。
val flowers = spark.sparkContext.textFile("flowers.txt")
//黄老师给5个小同学分发花朵
val flowersForKids = flowers.coalesce(5)
val flowersKV = flowersForKids.map((_, 1))
//黄小乙的两个步骤:大家先各自按颜色计数,然后再按照课桌统一计数
flowersKV.reduceByKey(_ + _).collect
接下来,我们来分析一下 reduceByKey 的 Map 阶段计算,相比 groupByKey 有何不同。就 Map 端的计算步骤来说,reduceByKey 与刚刚讲的 groupByKey 一样,都是先填充内存数据结构,然后排序溢出,最后归并排序。
区别在于,在计算的过程中,reduceByKey 采用一种叫做 PartitionedAppendOnlyMap 的数据结构来填充数据记录。这个数据结构是一种 Map,而 Map 的 Value 值是可累加、可更新的。因此,PartitionedAppendOnlyMap 非常适合聚合类的计算场景,如计数、求和、均值计算、极值计算等等。
在上图中,4 个 KV 对的 Value 值,是扫描到数据分区 0 当中青色花朵之前的状态。在 PartitionedAppendOnlyMap 中,由于 Value 是可累加、可更新的,因此这种数据结构可以容纳的花朵数量一定比 4 大。因此,相比 PartitionedPairBuffer,PartitionedAppendOnlyMap 的存储效率要高得多,溢出数据到磁盘文件的频率也要低得多。
以此类推,最终合并的数据文件也会小很多。依靠高效的内存数据结构、更少的磁盘文件、更小的文件尺寸,我们就能大幅降低了 Shuffle 过程中的磁盘和网络开销。
事实上,相比 groupByKey、collect_list 这些收集类算子,聚合类算子(reduceByKey、aggregateByKey 等)在执行性能上更占优势。因此,我们要避免在聚合类的计算需求中,引入收集类的算子。虽然这种做法不妨碍业务逻辑实现,但在性能调优上可以说是大忌。
最后,我们再来说说 Reduce 阶段,在“仙女散花”的游戏里,每个人把自己的花朵归好类之后,主动地把不同颜色的花朵放到相应的课桌上,这个过程实际上就是 Shuffle 过程中的数据分发。不过,与课桌被动地接收花朵不同的是,Shuffle 在 Reduce 阶段是主动地从 Map 端的中间文件中拉取数据。
刚刚讲过,每个 Map Task 都会生成如上图所示的中间文件,文件中的分区数与 Reduce 阶段的并行度一致。换句话说,每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据。索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task。
Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。在“仙女散花”的游戏中,5 个孩子与 5 张课桌之间,需要往返 25 人次。如果让 100 个孩子把 100 种颜色的花朵,分别收集到 100 张课桌上,那么这 100 个孩子与 100 张课桌之间,就需要 10000 人次的往返!显然,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。
Reduce Task 将拉取到的数据块填充到读缓冲区,然后按照任务的计算逻辑不停地消费、处理缓冲区中的数据记录,如下图所示。
我们可以看到,Reduce 阶段用圆圈标记的 1、2、3、4 与 Map 阶段的四个步骤一模一样。没错,因即是果、果即是因,当我们说某个 Stage 是 Map 阶段或是 Reduce 阶段的时候,我们的出发点或者说锚点就是 Shuffle。对于上图的 Shuffle 0 来说,Stage 0 是 Map 阶段,Stage 1 是 Reduce 阶段。但是,对于后面的 Shuffle 1 来说,Stage 1 就变成了 Map 阶段。因此你看,当我们把视角拉宽,Map 和 Reduce 这两个看似对立的东西,其实有很多地方都是相通的。
想必经过上面两个阶段的分析,你已经对 Shuffle 为何会成为性能瓶颈,有了比较直观的感受。这里,我再带你总结一下。
首先,对于 Shuffle 来说,它需要消耗所有的硬件资源:
其次,Shuffle 消耗的不同硬件资源之间很难达到平衡。磁盘和网络的消耗是 Shuffle 中必需的环节。但是,磁盘与网络的处理延迟相比 CPU 和内存要相差好几个数量级。以下表为例,如果以 CPU L1 缓存的处理延迟为基准,把单位从纳秒校准到秒,我们就会惊讶地发现,当 CPU、内存以秒为单位处理数据时,磁盘和网络的处理延迟是以天、月为单位的!
正是基于 Shuffle 的这些特点,我们才会“谈虎色变”,一提到 Shuffle 就避之唯恐不及,强调能省则省、能拖则拖。
这一讲,我借助实现仙女散花这个游戏的需求,带你直观地认识 Shuffle 的计算过程。Shuffle 有两个计算阶段,Map 阶段和 Reduce 阶段。我们要重点掌握 Map 阶段的计算流程,我把它总结为 4 步:
在 Reduce 阶段我们要注意,Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。并且,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。
最后,从硬件资源的角度来看,Shuffle 对每一种硬件资源都非常地渴求,尤其是内存、磁盘和网络。由于不同硬件资源之间的处理延迟差异巨大,我们很难在 Shuffle 过程中平衡 CPU、内存、磁盘和网络之间的计算开销。因此,对于 Shuffle 我们避之唯恐不及,要能省则省、能拖则拖。
期待在留言区看到你的思考和讨论,我们下一讲见!