Spark原理初探
Standalone模式作业提交
Standalone 模式下有四个重要组成部分,分别是:
-
Driver:用户编写的 Spark 应用程序就运行在 Driver 上,由Driver 进程执行
-
Master:主要负责资源的调度和分配,并进行集群的监控等职责
-
Worker:Worker 运行在集群中的一台服务器上。负责管理该节点上的资源,负责启动启动节点上的 Executor
-
Executor:一个 Worker 上可以运行多个 Executor,Executor通过启动多个线程(task)对 RDD 的分区进行并行计算
SparkContext 中的三大组件:
-
DAGScheduler:负责将DAG划分成若干个Stage
-
TaskScheduler:将DAGScheduler提交的 Stage(Taskset)进行优先级排序,再将 task 发送到 Executor
-
SchedulerBackend:定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数);executor更新状态,若任务完成的话,回收core;其他停止executor、remove executor等事件
Standalone模式下作业提交步骤:
-
1、启动应用程序,完成SparkContext的初始化
-
2、Driver向Master注册,申请资源
-
3、Master检查集群资源状况。若集群资源满足,通知Worker启动Executor
-
4、Executor启动后向Driver注册(称为反向注册)
-
5、Driver完成DAG的解析,得到Tasks,然后向Executor发送Task
-
6、Executor 向Driver汇总任务的执行情况
-
7、应用程序执行完毕,回收资源
RDD编程优化
-
1、RDD复用
避免创建重复的RDD。在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不要创建多个RDD来代表同一份数据。
-
2、RDD缓存/持久化
-
当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费
-
对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据
-
RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中
-
-
3、巧用 filter
-
尽可能早的执行filter操作,过滤无用数据
-
在filter过滤掉较多数据后,使用 coalesce 对数据进行重分区
-
-
4、使用高性能算子
-
1、避免使用groupByKey,根据场景选择使用高性能的聚合算子 reduceByKey、aggregateByKey
-
2、coalesce、repartition,在可能的情况下优先选择没有shuffle的操作
-
3、foreachPartition 优化输出操作
-
4、map、mapPartitions,选择合理的选择算子
mapPartitions性能更好,但数据量大时容易导致OOM
-
5、用 repartitionAndSortWithinPartitions 替代 repartition + sort 操作
-
6、合理使用 cache、persist、checkpoint,选择合理的数据存储级别
-
7、filter的使用
-
8、减少对数据源的扫描(算法复杂了)
-
-
5、设置合理的并行度
-
Spark作业中的并行度指各个stage的task的数量
-
设置合理的并行度,让并行度与资源相匹配。简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度
-
-
6、广播大变量
-
默认情况下,task中的算子中如果使用了外部变量,每个task都会获取一份变量的复本,这会造多余的网络传输和内存消耗
-
使用广播变量,只会在每个Executor保存一个副本,Executor的所有task共用此广播变量,这样就节约了网络及内存资源
-
Shuffle原理
Shuffle的本意是洗牌,目的是为了把牌弄乱。
Spark、Hadoop中的shuffle可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有一定规则的数据。
Shuffle是MapReduce计算框架中的一个特殊的阶段,介于 Map 和 Reduce 之间。当Map的输出结果要被Reduce使用时,输出结果需要按key排列,并且分发到Reducer上去,这个过程就是shuffle。
shuffle涉及到了本地磁盘(非hdfs)的读写和网络的传输,大多数Spark作业的性能主要就是消耗在了shuffle环节。因此shuffle性能的高低直接影响到了整个程序的运行效率
在Spark Shuffle的实现上,经历了Hash Shuffle、Sort Shuffle、Unsafe Shuffle三阶段:
-
Spark 0.8及以前 Hash Based Shuffle
-
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
-
Spark 0.9 引入ExternalAppendOnlyMap
-
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
-
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
-
Spark 1.4 引入Tungsten-Sort Based Shuffle
-
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
-
Spark 2.0 Hash Based Shuffle退出历史舞台
简单的说:
-
Spark 1.1 以前是Hash Shuffle
-
Spark 1.1 引入了Sort Shuffle
-
Spark 1.6 将Tungsten-sort并入Sort Shuffle
-
Spark 2.0 Hash Shuffle退出历史舞台
-
1、Hash Base Shuffle V1
-
每个Shuffle Map Task需要为每个下游的Task创建一个单独的文件
-
Shuffle过程中会生成海量的小文件。同时打开过多文件、低效的随机IO
-
-
2、Hash Base Shuffle V2
Hash Base Shuffle V2 核心思想:允许不同的task复用同一批磁盘文件,有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。一定程度上解决了Hash V1中的问题,但不彻底。
Hash Shuffle 规避了排序,提高了性能;总的来说在Hash Shuffle过程中生成海量的小文件(Hash Base Shuffle V2生成海量小文件的问题得到了一定程度的缓解)。
-
3、Sort Base Shuffle
Sort Base Shuffle大大减少了shuffle过程中产生的文件数,提高Shuffle的效率;
Spark Shuffle 与 Hadoop Shuffle 从目的、意义、功能上看是类似的,实现(细节)上有区别。