序列化

在实际开发中会自定义一些对RDD的操作,此时需要注意的是:

  • 初始化工作是在Driver端进行的

  • 实际运行程序是在Executor端进行的

这就涉及到了进程通信,是需要序列化的。

可以简单的认为SparkContext代表Driver。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.lagou.sparkcore

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int){
val num: Int = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
val num: Int = x
}

object SerializableDemo {
def main(args: Array[String]): Unit = {
// 初始化
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val o1 = new MyClass1(8)
// println(s"o1.num = ${o1.num}")

val rdd1 = sc.makeRDD(1 to 20)
// 方法
def add1(x: Int) = x + 100
// 函数
val add2 = add1 _

// 函数、方法都具备序列化和反序列化的能力
// rdd1.map(add1(_)).foreach(println)
// println("****************************************************")
// rdd1.map(add2(_)).foreach(println)

val object1 = new MyClass1(20)
val i = 20
// rdd1.map(x => object1.num + x).foreach(println)

// 解决方案一:使用case class
val object2 = MyClass2(20)
// rdd1.map(x => object2.num + x).foreach(println)

// 解决方案二:MyClass1 实现 Serializable 接口
val object3 = new MyClass3(20)
rdd1.map(x => object3.num + x).foreach(println)

sc.stop()
}
}

备注:

  • 如果在方法、函数的定义中引用了不可序列化的对象,也会导致任务不能序列化

  • 延迟创建的解决方案较为简单,适用性广

RDD依赖关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。 依赖有2个作用:其一用来解决数 据容错;其二用来划分stage。

窄依赖:1:1 或 n:1

宽依赖:n:m;意味着有 shuffle

要能够准确、迅速的区分哪些算子是宽依赖;



DAG(Directed Acyclic Graph) 有向无环图。原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage:

  • 对于窄依赖,partition的转换处理在Stage中完成计算

  • 对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算

  • 宽依赖是划分Stage的依据


RDD任务切分中间分为:Driver programe、Job、Stage(TaskSet)和Task

  • Driver program:初始化一个SparkContext即生成一个Spark应用

  • Job:一个Action算子就会生成一个Job

  • Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

  • Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

  • Task是Spark中任务调度的最小单位;每个Stage包含许多Task,这些Task执行的计算逻辑相同的,计算的数据是不同的

注意:Driver programe->Job->Stage-> Task每一层都是1对n的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 窄依赖
scala> val rdd1 = sc.parallelize(1 to 10, 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(11 to 20, 1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:27

scala> rdd3.dependencies.size
res0: Int = 2

scala> rdd3.dependencies
res1: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@55940257, org.apache.spark.RangeDependency@6bd87866)

// 打印rdd1的数据
scala> rdd3.dependencies(0).rdd.collect
res2: Array[_] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// 打印rdd2的数据
scala> rdd3.dependencies(1).rdd.collect
res3: Array[_] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

// 宽依赖
scala> val random = new scala.util.Random
random: scala.util.Random = scala.util.Random@3b887e2b

scala> val arr = (1 to 100).map(idx => random.nextInt(100))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(87, 45, 51, 14, 70, 11, 96, 88, 14, 86, 63, 65, 91, 65, 11, 37, 81, 98, 40, 9, 81, 82, 46, 23, 88, 39, 31, 42, 49, 53, 26, 59, 55, 68, 55, 51, 89, 14, 97, 70, 33, 0, 43, 92, 52, 64, 64, 87, 99, 95, 99, 64, 61, 40, 77, 71, 37, 94, 62, 70, 7, 86, 30, 93, 80, 77, 34, 66, 89, 72, 92, 48, 58, 69, 91, 62, 63, 52, 77, 98, 20, 35, 66, 34, 51, 28, 89, 84, 31, 61, 23, 50, 93, 21, 64, 78, 14, 18, 70, 53)

scala> val rdd1 = sc.makeRDD(arr).map((_, 1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at map at <console>:26

scala> val rdd2 = rdd1.reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:25

// 观察依赖
scala> rdd2.dependencies
res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@45050433)

scala> rdd2.dependencies(0).rdd.collect
res5: Array[_] = Array((87,1), (45,1), (51,1), (14,1), (70,1), (11,1), (96,1), (88,1), (14,1), (86,1), (63,1), (65,1), (91,1), (65,1), (11,1), (37,1), (81,1), (98,1), (40,1), (9,1), (81,1), (82,1), (46,1), (23,1), (88,1), (39,1), (31,1), (42,1), (49,1), (53,1), (26,1), (59,1), (55,1), (68,1), (55,1), (51,1), (89,1), (14,1), (97,1), (70,1), (33,1), (0,1), (43,1), (92,1), (52,1), (64,1), (64,1), (87,1), (99,1), (95,1), (99,1), (64,1), (61,1), (40,1), (77,1), (71,1), (37,1), (94,1), (62,1), (70,1), (7,1), (86,1), (30,1), (93,1), (80,1), (77,1), (34,1), (66,1), (89,1), (72,1), (92,1), (48,1), (58,1), (69,1), (91,1), (62,1), (63,1), (52,1), (77,1), (98,1), (20,1), (35,1), (66,1), (34,1), (51,1), (28,1), (89,1), (84,1), (31,1), (61,1), (23,1), (50,1), (93,1), (21,1),...

scala> rdd2.dependencies(0).rdd.dependencies(0).rdd.collect
res6: Array[_] = Array(87, 45, 51, 14, 70, 11, 96, 88, 14, 86, 63, 65, 91, 65, 11, 37, 81, 98, 40, 9, 81, 82, 46, 23, 88, 39, 31, 42, 49, 53, 26, 59, 55, 68, 55, 51, 89, 14, 97, 70, 33, 0, 43, 92, 52, 64, 64, 87, 99, 95, 99, 64, 61, 40, 77, 71, 37, 94, 62, 70, 7, 86, 30, 93, 80, 77, 34, 66, 89, 72, 92, 48, 58, 69, 91, 62, 63, 52, 77, 98, 20, 35, 66, 34, 51, 28, 89, 84, 31, 61, 23, 50, 93, 21, 64, 78, 14, 18, 70, 53)
  • 再谈WordCount

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    scala> val rdd1 = sc.textFile("/wcinput/wc.txt")
    rdd1: org.apache.spark.rdd.RDD[String] = /wcinput/wc.txt MapPartitionsRDD[11] at textFile at <console>:24

    scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:25

    scala> val rdd3 = rdd2.map((_, 1))
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:25

    scala> val rdd4 = rdd3.reduceByKey(_+_)
    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:25

    scala> val rdd5 = rdd4.sortByKey()
    rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at sortByKey at <console>:25

    scala> rdd5.count
    res7: Long = 5

    // 查看RDD的血缘关系
    scala> rdd1.toDebugString
    res8: String =
    (2) /wcinput/wc.txt MapPartitionsRDD[11] at textFile at <console>:24 []
    | /wcinput/wc.txt HadoopRDD[10] at textFile at <console>:24 []

    scala> rdd5.toDebugString
    res9: String =
    (2) ShuffledRDD[17] at sortByKey at <console>:25 []
    +-(2) ShuffledRDD[14] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[13] at map at <console>:25 []
    | MapPartitionsRDD[12] at flatMap at <console>:25 []
    | /wcinput/wc.txt MapPartitionsRDD[11] at textFile at <console>:24 []
    | /wcinput/wc.txt HadoopRDD[10] at textFile at <console>:24 []

    // 查看依赖
    scala> rdd1.dependencies
    res10: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@793ede88)

    scala> rdd1.dependencies(0).rdd
    res11: org.apache.spark.rdd.RDD[_] = /wcinput/wc.txt HadoopRDD[10] at textFile at <console>:24

    scala> rdd5.dependencies
    res12: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@76ca2fa7)

    scala> rdd5.dependencies(0).rdd
    res13: org.apache.spark.rdd.RDD[_] = ShuffledRDD[14] at reduceByKey at <console>:25

    // 查看最佳优先位置
    scala> val hadoopRDD = rdd1.dependencies(0).rdd
    hadoopRDD: org.apache.spark.rdd.RDD[_] = /wcinput/wc.txt HadoopRDD[10] at textFile at <console>:24

    scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
    res14: Seq[String] = ArraySeq(Linux121, Linux123, Linux122)

    # 使用 hdfs 命令检查文件情况
    [root@Linux121 ~]# hdfs fsck /wcinput/wc.txt -files -blocks -locations
    Connecting to namenode via http://Linux121:50070/fsck?ugi=root&files=1&blocks=1&locations=1&path=%2Fwcinput%2Fwc.txt
    FSCK started by root (auth:SIMPLE) from /192.168.91.121 for path /wcinput/wc.txt at Tue Aug 30 03:17:24 WEST 2022
    /wcinput/wc.txt 108 bytes, 1 block(s): OK
    0. BP-364958193-192.168.91.121-1654312458375:blk_1073743340_2518 len=108 Live_repl=3 [DatanodeInfoWithStorage[192.168.91.122:50010,DS-544bf1f7-21d4-4544-aa02-97ea359d927c,DISK], DatanodeInfoWithStorage[192.168.91.123:50010,DS-03aedb89-d6e4-4a26-b1e2-85c8d2991d4f,DISK], DatanodeInfoWithStorage[192.168.91.121:50010,DS-422e34a0-a422-443a-87d8-a46d92e8ab90,DISK]]

    问题:上面的WordCount中一共几个job,几个Stage,几个Task?


    本例中整个过程分为1个job,3个Stage;6个Task



    为什么这里显示有2个job?参见RDD编程之分区与分区器博客文章