Key-Value RDD操作

RDD整体上分为 Value 类型和 Key-Value 类型。

前面介绍的是 Value 类型的RDD的操作,实际使用更多的是 key-value 类型的RDD,也称为 PairRDD。

Value 类型RDD的操作基本集中在 RDD.scala 中;

key-value 类型的RDD操作集中在 PairRDDFunctions.scala 中;


前面介绍的大多数算子对 Pair RDD 都是有效的。Pair RDD还有属于自己的Transformation、Action 算子;

创建Pair RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scala> val arr = (1 to 10).toArray
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val arr1 = arr.map(x => (x, x*10, x*100))
arr1: Array[(Int, Int, Int)] = Array((1,10,100), (2,20,200), (3,30,300), (4,40,400), (5,50,500), (6,60,600), (7,70,700), (8,80,800), (9,90,900), (10,100,1000))

// rdd1 不是 Pair RDD
scala> val rdd1 = sc.makeRDD(arr1)
rdd1: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[10] at makeRDD at <console>:26

scala> rdd1.collectAsMap
<console>:26: error: value collectAsMap is not a member of org.apache.spark.rdd.RDD[(Int, Int, Int)]
rdd1.collectAsMap
^

// rdd2 是 Pair RDD, 必须是key-value形式
scala> val arr2 = arr.map(x => (x, (x*10, x*100)))
arr2: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)), (4,(40,400)), (5,(50,500)), (6,(60,600)), (7,(70,700)), (8,(80,800)), (9,(90,900)), (10,(100,1000)))

scala> val rdd2 = sc.makeRDD(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ParallelCollectionRDD[11] at makeRDD at <console>:26

scala> rdd2.collectAsMap
res19: scala.collection.Map[Int,(Int, Int)] = Map(8 -> (80,800), 2 -> (20,200), 5 -> (50,500), 4 -> (40,400), 7 -> (70,700), 10 -> (100,1000), 1 -> (10,100), 9 -> (90,900), 3 -> (30,300), 6 -> (60,600))

Transformation操作

类似 map 操作

mapValues / flatMapValues / keys / values,这些操作都可以使用 map 操作实现,是简化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
scala> val a = sc.parallelize(List((1,2),(3,4),(5,6)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> a.collect
res23: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

// 使用 mapValues 更简洁,只修改value值
scala> val b = a.mapValues(x=>1 to x)
b: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[13] at mapValues at <console>:25

scala> b.collect
res20: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))

// 可使用map实现同样的操作
scala> val b = a.map(x => (x._1, 1 to x._2))
b: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[14] at map at <console>:25

scala> b.collect
res21: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))

scala> val b = a.map{case (k, v) => (k, 1 to v)}
b: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[15] at map at <console>:25

scala> b.collect
res22: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))

// flatMapValues 将 value 的值压平
scala> val c = a.flatMapValues(x=>1 to x)
c: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[16] at flatMapValues at <console>:25

scala> c.collect
res24: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))

scala> val c = a.mapValues(x=>1 to x).flatMap{case (k, v) => v.map(x => (k, x))}
c: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at flatMap at <console>:25

scala> c.collect
res25: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))

scala> c.keys
res26: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at keys at <console>:26

scala> c.keys.collect
res29: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)

scala> c.values
res27: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at values at <console>:26

scala> c.values.collect
res30: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)

scala> c.map{case (k, v) => k}.collect
res31: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)

scala> c.map{case (k, _) => k}.collect
res32: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)

scala> c.map{case (_, v) => v}.collect
res33: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)

聚合操作【重要、难点】

**PariRDD(k, v)**使用范围广,聚合

groupByKey / reduceByKey / foldByKey / aggregateByKey

combineByKey(OLD) / combineByKeyWithClassTag(NEW) => 底层实现

subtractByKey:类似于subtract,删掉 RDD 中键与 other RDD 中的键相同的元素

小案例:给定一组数据:(“spark”, 12), (“hadoop”, 26), (“hadoop”, 23), (“spark”, 15), (“scala”, 26), (“spark”, 25), (“spark”, 23), (“hadoop”, 16), (“scala”, 24), (“spark”, 16), 键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
scala> val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at makeRDD at <console>:24

// groupByKey 按照key分组
scala> rdd.groupByKey().collect
res45: Array[(String, Iterable[Int])] = Array((hadoop,CompactBuffer(23, 16, 26)), (spark,CompactBuffer(12, 15, 25, 23, 16)), (scala,CompactBuffer(26, 24)))

scala> rdd.groupByKey().map(x=>(x._1, x._2.sum.toDouble/x._2.size)).collect
res34: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

scala> rdd.groupByKey().map{case (k, v) => (k, v.sum.toDouble/v.size)}.collect
res35: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

scala> rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect
res36: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// reduceByKey 按照key去聚合
scala> rdd.reduceByKey(_+_).collect
res46: Array[(String, Int)] = Array((hadoop,65), (spark,91), (scala,50))

scala> rdd.mapValues((_, 1)).collect
res47: Array[(String, (Int, Int))] = Array((spark,(12,1)), (hadoop,(26,1)), (hadoop,(23,1)), (spark,(15,1)), (scala,(26,1)), (spark,(25,1)), (spark,(23,1)), (hadoop,(16,1)), (scala,(24,1)), (spark,(16,1)))

scala> rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).collect
res48: Array[(String, (Int, Int))] = Array((hadoop,(65,3)), (spark,(91,5)), (scala,(50,2)))

scala> rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()
res37: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// foldByKey 给初值去聚合
scala> rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => { (x._1+y._1, x._2+y._2) }).mapValues(x=>x._1.toDouble/x._2).collect
res38: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// aggregateByKey
// aggregateByKey => 定义初值 + 分区内的聚合函数 + 分区间的聚合函数
scala> rdd.mapValues((_, 1)).aggregateByKey((0,0))( (x, y) => (x._1 + y._1, x._2 + y._2), (a, b) => (a._1 + b._1, a._2 + b._2) ).mapValues(x=>x._1.toDouble / x._2).collect
res39: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// 初值(元组)与RDD元素类型(Int)可以不一致
scala> rdd.aggregateByKey((0, 0))( (x, y) => {println(s"x=$x, y=$y"); (x._1 + y, x._2 + 1)}, (a, b) => {println(s"a=$a, b=$b"); (a._1 + b._1, a._2 + b._2)} ).mapValues(x=>x._1.toDouble/x._2).collect
res40: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// 分区内的合并与分区间的合并,可以采用不同的方式;这种方式是低效的!
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int] ())((x, y) => {x.append(y); x}, (a, b) => {a++b} ).mapValues(v => v.sum.toDouble/v.size).collect
res41: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// combineByKey(理解就行)
scala> rdd.combineByKey( (x: Int) => {println(s"x=$x"); (x,1)}, (x: (Int, Int), y: Int) => {println(s"x=$x, y=$y");(x._1+y, x._2+1)}, (a: (Int, Int), b: (Int, Int)) => {println(s"a=$a, b=$b"); (a._1+b._1, a._2+b._2)} ).mapValues(x=>x._1.toDouble/x._2).collect
res42: Array[(String, Double)] = Array((hadoop,21.666666666666668), (spark,18.2), (scala,25.0))

// subtractByKey
scala> val rdd1 = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[49] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("spark", 100), ("hadoop", 300)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[50] at makeRDD at <console>:24

scala> rdd1.subtractByKey(rdd2).collect()
res43: Array[(String, Int)] = Array()

// subtractByKey
scala> val rdd = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at makeRDD at <console>:24

scala> val other = sc.makeRDD(Array(("a",10), ("b",20), ("c",30)))
other: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at makeRDD at <console>:24

scala> rdd.subtractByKey(other).collect()
res44: Array[(String, Int)] = Array((d,5))

结论:效率相等用最熟悉的方法;groupByKey在一般情况下效率低,尽量少用

初学:最重要的是实现;如果使用了groupByKey,寻找替换的算子实现;


groupByKey Shuffle过程中传输的数据量大,效率低


排序操作

sortByKey:sortByKey函数作用于PairRDD,对Key进行排序。在 org.apache.spark.rdd.OrderedRDDFunctions 中实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[64] at parallelize at <console>:24

scala> val b = sc.parallelize (1 to a.count.toInt)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:26

scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[66] at zip at <console>:27

scala> c.sortByKey().collect
res51: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))

scala> c.sortByKey(false).collect
res52: Array[(String, Int)] = Array((wyp,1), (test,5), (iteblog,2), (com,3), (397090770,4))

join操作

cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
scala> val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[73] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[74] at makeRDD at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[76] at cogroup at <console>:27

scala> rdd3.collect.foreach(println)
(1,(CompactBuffer(Spark),CompactBuffer()))
(2,(CompactBuffer(Hadoop),CompactBuffer()))
(3,(CompactBuffer(Kylin),CompactBuffer(李四)))
(4,(CompactBuffer(Flink),CompactBuffer(王五)))
(5,(CompactBuffer(),CompactBuffer(赵六)))
(6,(CompactBuffer(),CompactBuffer(冯七)))

scala> rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
res54: Array[(Int, (Iterable[String], Iterable[String]))] = Array((3,(CompactBuffer(Kylin),CompactBuffer(李四))), (4,(CompactBuffer(Flink),CompactBuffer(王五))))

// 仿照源码实现join操作
scala> rdd3.flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))
res55: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[78] at flatMapValues at <console>:26

scala> val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"), ("3","Scala"),("4","Java")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[79] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"), ("5","25K"),("6","10K")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[80] at makeRDD at <console>:24

scala> rdd1.join(rdd2).collect
res56: Array[(String, (String, String))] = Array((3,(Scala,20K)), (4,(Java,18K)))

scala> rdd1.leftOuterJoin(rdd2).collect
res57: Array[(String, (String, Option[String]))] = Array((1,(Spark,None)), (2,(Hadoop,None)), (3,(Scala,Some(20K))), (4,(Java,Some(18K))))

scala> rdd1.rightOuterJoin(rdd2).collect
res58: Array[(String, (Option[String], String))] = Array((3,(Some(Scala),20K)), (4,(Some(Java),18K)), (5,(None,25K)), (6,(None,10K)))

scala> rdd1.fullOuterJoin(rdd2).collect
res59: Array[(String, (Option[String], Option[String]))] = Array((1,(Some(Spark),None)), (2,(Some(Hadoop),None)), (3,(Some(Scala),Some(20K))), (4,(Some(Java),Some(18K))), (5,(None,Some(25K))), (6,(None,Some(10K))))

Action操作

collectAsMap / countByKey / lookup(key)

countByKey源码:


lookup(key):高效的查找方法,只查找对应分区的数据(如果RDD有分区器的话)


1
2
3
4
5
6
7
8
9
10
11
scala>   val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"), ("3","Scala"),("1","Java")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[93] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"), ("5","25K"),("6","10K")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[94] at makeRDD at <console>:24

scala> rdd1.lookup("1")
res60: Seq[String] = WrappedArray(Spark, Java)

scala> rdd2.lookup("3")
res61: Seq[String] = WrappedArray(20K)

输入与输出

文件输入与输出

文本文件

数据读取:textFile(String)。可指定单个文件,支持通配符

这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):RDD[(String, String)])

返回值RDD[(String, String)],其中Key是文件的名称,Value是文件的内容

数据保存:saveAsTextFile(String)。指定的输出目录。

csv文件

读取 CSV(Comma-Separated Values)/TSV(Tab-Separated Values) 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。

CSV/TSV 数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD,然后使用 Spark 的文本文件 API 写出去。

json文件

如果 JSON 文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

JSON数据的输出主要是通过在输出之前将由结构化数据组成的 RDD 转为字符串RDD,然后使用 Spark 的文本文件 API 写出去。

json文件的处理使用SparkSQL最为简洁。

SequenceFile

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。 Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext中,可以调用:sequenceFile[keyClass, valueClass];

调用 saveAsSequenceFile(path) 保存PairRDD,系统将键和值能够自动转为Writable类型。

对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。

通过 objectFilek,v 接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。

JDBC

详见Scala算子综合应用案例博客文档