Action操作

与RDD类似的操作

show、collect、collectAsList、head、first、count、take、takeAsList、reduce

与结构相关

printSchema、explain、columns、dtypes、col

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 处理了文件头,得到了字段名称
// 使用自动类型推断,基本得到了合理的值
val df1 = spark.read.
option("header", "true").
option("inferschema","true").
csv("data/emp.dat")

df1.count

// 缺省显示20行
df1.union(df1).show()

// 显示2行
df1.show(2)

// 不截断字符
df1.toJSON.show(false)
// 显示10行,不截断字符
df1.toJSON.show(10, false)
spark.catalog.listFunctions.show(10000, false)

// collect返回的是数组, Array[org.apache.spark.sql.Row]
val c1 = df1.collect()

// collectAsList返回的是List, List[org.apache.spark.sql.Row]
val c2 = df1.collectAsList()

// 返回 org.apache.spark.sql.Row
val h1 = df1.head()
val f1 = df1.first()

// 返回 Array[org.apache.spark.sql.Row],长度为3
val h2 = df1.head(3)
val f2 = df1.take(3)

// 返回 List[org.apache.spark.sql.Row],长度为2
val t2 = df1.takeAsList(2)
1
2
3
4
5
6
// 结构属性
df1.columns // 查看列名
df1.dtypes // 查看列名和类型
df1.explain() // 参看执行计划
df1.col("name") // 获取某个列
df1.printSchema // 常用

Transformation 操作

与RDD类似的操作

map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、distinct、dropDuplicates、describe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
df1.map(row=>row.getAs[Int](0)).show

// randomSplit(与RDD类似,将DF、DS按给定参数分成多份)
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

// 取10行数据生成新的DataSet
val df2 = df1.limit(10)

// distinct,去重
val df2 = df1.union(df1)
df2.distinct.count

// dropDuplicates,按列值去重
df2.dropDuplicates.show
df2.dropDuplicates("mgr", "deptno").show
df2.dropDuplicates("mgr").show
df2.dropDuplicates("deptno").show

// 返回全部列的统计(count、mean、stddev、min、max)
ds1.describe().show

// 返回指定列的统计
ds1.describe("sal").show
ds1.describe("sal", "comm").show

存储相关

cacheTable、persist、checkpoint、unpersist、cache

备注:Dataset 默认的存储级别是 MEMORY_AND_DISK

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("hdfs://linux121:9000/chec kpoint")

df1.show()
df1.checkpoint()
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

df1.createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
spark.catalog.uncacheTable("t1")

select相关

列的多种表示、select、selectExpr

drop、withColumn、withColumnRenamed、cast(内置函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 列的多种表示方法。使用""、$""、'、col()、ds("")
// 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的 地方都有效
df1.select($"ename", $"hiredate", $"sal").show
df1.select("ename", "hiredate", "sal").show
df1.select('ename, 'hiredate, 'sal).show
df1.select(col("ename"), col("hiredate"), col("sal")).show
df1.select(df1("ename"), df1("hiredate"), df1("sal")).show

// 下面的写法无效,其他列的表示法有效
df1.select("ename", "hiredate", "sal"+100).show
df1.select("ename", "hiredate", "sal+100").show

// 这样写才符合语法
df1.select($"ename", $"hiredate", $"sal"+100).show
df1.select('ename, 'hiredate, 'sal+100).show

// 可使用expr表达式(expr里面只能使用引号)
df1.select(expr("comm+100"), expr("sal+100"), expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("power(sal, 2)", "sal").show
df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show

// drop、withColumn、 withColumnRenamed、casting
// drop 删除一个或多个列,得到新的DF
df1.drop("mgr")
df1.drop("empno", "mgr")

// withColumn,修改列值
val df2 = df1.withColumn("sal", $"sal"+1000)
df2.show

// withColumnRenamed,更改列名
df1.withColumnRenamed("sal", "newsal")

// 备注:drop、withColumn、withColumnRenamed返回的是DF
// cast,类型转换
df1.selectExpr("cast(empno as string)").printSchema

import org.apache.spark.sql.types._
df1.select('empno.cast(StringType)).printSchema

where相关

where == filter

1
2
3
4
5
6
7
// where操作
df1.filter("sal>1000").show
df1.filter("sal>1000 and job=='MANAGER'").show

// filter操作
df1.where("sal>1000").show
df1.where("sal>1000 and job=='MANAGER'").show

groupBy相关

groupBy、agg、max、min、avg、sum、count(后面5个为内置函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// groupBy、max、min、mean、sum、count(与df1.count不同)
df1.groupBy("Job").sum("sal").show
df1.groupBy("Job").max("sal").show
df1.groupBy("Job").min("sal").show
df1.groupBy("Job").avg("sal").show
df1.groupBy("Job").count.show

// 类似having子句
df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show

// agg
df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"- >"avg", "sal"->"sum", "sal"->"count").show
df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"- >"avg", "sal"->"sum", "sal"->"count").show

// 这种方式更好理解
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"), sum("sal"), count("sal")).show

// 给列取别名
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"), sum("sal"), count("sal")).withColumnRenamed("min(sal)", "min1").show

// 给列取别名,最简便
df1.groupBy("Job").agg(max("sal").as("max1"), min("sal").as("min2"), avg("sal").as("avg3"), sum("sal").as("sum4"), count("sal").as("count5")).show

orderBy相关

orderBy == sort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// orderBy
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show

// 降序
df1.orderBy(-$"sal").show
df1.orderBy('sal).show
df1.orderBy(col("sal")).show
df1.orderBy(df1("sal")).show

df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'deptno, -'sal).show

// sort,以下语句等价
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort('sal).show
df1.sort(col("sal")).show
df1.sort(df1("sal")).show

df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'deptno, -'sal).show

join相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 1、笛卡尔积
df1.crossJoin(df1).count

// 2、等值连接(单字段)(连接字段empno,仅显示了一次)
df1.join(df1, "empno").count

// 3、等值连接(多字段)(连接字段empno、ename,仅显示了一次)
df1.join(df1, Seq("empno", "ename")).show

// 定义第一个数据集
case class StudentAge(sno: Int, name: String, age: Int)
val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))
val ds1 = spark.createDataset(lst)
ds1.show()

// 定义第二个数据集
case class StudentHeight(sname: String, height: Int)
val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))
val ds2 = rdd.toDS

// 备注:不能使用双引号,而且这里是 ===
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, 'name==='sname).show

ds1.join(ds2, ds1("name")===ds2("sname")).show
ds1.join(ds2, ds1("sname")===ds2("sname"), "inner").show

// 多种连接方式
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, $"name"===$"sname", "inner").show

ds1.join(ds2, $"name"===$"sname", "left").show
ds1.join(ds2, $"name"===$"sname", "left_outer").show

ds1.join(ds2, $"name"===$"sname", "right").show
ds1.join(ds2, $"name"===$"sname", "right_outer").show

ds1.join(ds2, $"name"===$"sname", "outer").show
ds1.join(ds2, $"name"===$"sname", "full").show
ds1.join(ds2, $"name"===$"sname", "full_outer").show

备注:DS在join操作之后变成了DF

集合相关

union==unionAll(过期)、intersect、except

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// union、unionAll、intersect、except。集合的交、并、差
val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")

// union 求并集,不去重
ds3.union(ds4).show

// unionAll、union 等价;unionAll过期方法,不建议使用
ds3.unionAll(ds4).show

// intersect 求交
ds3.intersect(ds4).show

// except 求差
ds3.except(ds4).show

空值处理

na.fill、na.drop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// NaN (Not a Number)
math.sqrt(-1.0)
math.sqrt(-1.0).isNaN()
df1.show

// 删除所有列的空值和NaN
df1.na.drop.show /

/ 删除某列的空值和NaN
df1.na.drop(Array("mgr")).show

// 对全部列填充;对指定单列填充;对指定多列填充
df1.na.fill(1000).show
df1.na.fill(1000, Array("comm")).show
df1.na.fill(Map("mgr"->2000, "comm"->1000)).show

// 对指定的值进行替换
df1.na.replace("comm" :: "deptno" :: Nil, Map(0 -> 100, 10 -> 100)).show

// 查询空值列或非空值列。isNull、isNotNull为内置函数
df1.filter("comm is null").show
df1.filter($"comm".isNull).show
df1.filter(col("comm").isNull).show

df1.filter("comm is not null").show
df1.filter(col("comm").isNotNull).show

窗口函数

一般情况下窗口函数不用 DSL 处理,直接用SQL更方便

参考源码Window.scala、WindowSpec.scala(主要)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark.sql.expressions.Window

val w1 = Window.partitionBy("cookieid").orderBy("createtime")
val w2 = Window.partitionBy("cookieid").orderBy("pv")
val w3 = w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w4 = w1.rowsBetween(-1, 1)

// 聚组函数【用分析函数的数据集】
df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w3).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w4).as("pv1")).show

// 排名
df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show
df.select($"cookieid", $"pv", dense_rank().over(w2).alias("denserank")).show
df.select($"cookieid", $"pv", row_number().over(w2).alias("rownumber")).show

// lag、lead
df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("rownumber")).show
df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("rownumber")).show

内建函数

官方文档:http://spark.apache.org/docs/latest/api/sql/index.html