Standalone模式作业提交

Standalone 模式下有四个重要组成部分,分别是:

  • Driver:用户编写的 Spark 应用程序就运行在 Driver 上,由Driver 进程执行

  • Master:主要负责资源的调度和分配,并进行集群的监控等职责

  • Worker:Worker 运行在集群中的一台服务器上。负责管理该节点上的资源,负责启动启动节点上的 Executor

  • Executor:一个 Worker 上可以运行多个 Executor,Executor通过启动多个线程(task)对 RDD 的分区进行并行计算

SparkContext 中的三大组件:

  • DAGScheduler:负责将DAG划分成若干个Stage

  • TaskScheduler:将DAGScheduler提交的 Stage(Taskset)进行优先级排序,再将 task 发送到 Executor

  • SchedulerBackend:定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数);executor更新状态,若任务完成的话,回收core;其他停止executor、remove executor等事件


Standalone模式下作业提交步骤:

  • 1、启动应用程序,完成SparkContext的初始化

  • 2、Driver向Master注册,申请资源

  • 3、Master检查集群资源状况。若集群资源满足,通知Worker启动Executor

  • 4、Executor启动后向Driver注册(称为反向注册)

  • 5、Driver完成DAG的解析,得到Tasks,然后向Executor发送Task

  • 6、Executor 向Driver汇总任务的执行情况

  • 7、应用程序执行完毕,回收资源


RDD编程优化

  • 1、RDD复用

    避免创建重复的RDD。在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不要创建多个RDD来代表同一份数据。

  • 2、RDD缓存/持久化

    • 当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费

    • 对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据

    • RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中

  • 3、巧用 filter

    • 尽可能早的执行filter操作,过滤无用数据

    • 在filter过滤掉较多数据后,使用 coalesce 对数据进行重分区

  • 4、使用高性能算子

    • 1、避免使用groupByKey,根据场景选择使用高性能的聚合算子 reduceByKey、aggregateByKey

    • 2、coalesce、repartition,在可能的情况下优先选择没有shuffle的操作

    • 3、foreachPartition 优化输出操作

    • 4、map、mapPartitions,选择合理的选择算子

      mapPartitions性能更好,但数据量大时容易导致OOM

    • 5、用 repartitionAndSortWithinPartitions 替代 repartition + sort 操作

    • 6、合理使用 cache、persist、checkpoint,选择合理的数据存储级别

    • 7、filter的使用

    • 8、减少对数据源的扫描(算法复杂了)

  • 5、设置合理的并行度

    • Spark作业中的并行度指各个stage的task的数量

    • 设置合理的并行度,让并行度与资源相匹配。简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度

  • 6、广播大变量

    • 默认情况下,task中的算子中如果使用了外部变量,每个task都会获取一份变量的复本,这会造多余的网络传输和内存消耗

    • 使用广播变量,只会在每个Executor保存一个副本,Executor的所有task共用此广播变量,这样就节约了网络及内存资源

Shuffle原理

Shuffle的本意是洗牌,目的是为了把牌弄乱。

Spark、Hadoop中的shuffle可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有一定规则的数据。

Shuffle是MapReduce计算框架中的一个特殊的阶段,介于 Map 和 Reduce 之间。当Map的输出结果要被Reduce使用时,输出结果需要按key排列,并且分发到Reducer上去,这个过程就是shuffle。

shuffle涉及到了本地磁盘(非hdfs)的读写和网络的传输,大多数Spark作业的性能主要就是消耗在了shuffle环节。因此shuffle性能的高低直接影响到了整个程序的运行效率

在Spark Shuffle的实现上,经历了Hash Shuffle、Sort Shuffle、Unsafe Shuffle三阶段:

  • Spark 0.8及以前 Hash Based Shuffle

  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

  • Spark 0.9 引入ExternalAppendOnlyMap

  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

  • Spark 1.4 引入Tungsten-Sort Based Shuffle

  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle

  • Spark 2.0 Hash Based Shuffle退出历史舞台

简单的说:

  • Spark 1.1 以前是Hash Shuffle

  • Spark 1.1 引入了Sort Shuffle

  • Spark 1.6 将Tungsten-sort并入Sort Shuffle

  • Spark 2.0 Hash Shuffle退出历史舞台


  • 1、Hash Base Shuffle V1

    • 每个Shuffle Map Task需要为每个下游的Task创建一个单独的文件

    • Shuffle过程中会生成海量的小文件。同时打开过多文件、低效的随机IO


  • 2、Hash Base Shuffle V2

    Hash Base Shuffle V2 核心思想:允许不同的task复用同一批磁盘文件,有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。一定程度上解决了Hash V1中的问题,但不彻底。


    Hash Shuffle 规避了排序,提高了性能;总的来说在Hash Shuffle过程中生成海量的小文件(Hash Base Shuffle V2生成海量小文件的问题得到了一定程度的缓解)。

  • 3、Sort Base Shuffle

    Sort Base Shuffle大大减少了shuffle过程中产生的文件数,提高Shuffle的效率;


    Spark Shuffle 与 Hadoop Shuffle 从目的、意义、功能上看是类似的,实现(细节)上有区别。