SQL语句

总体而言:SparkSQL与HQL兼容;与HQL相比,SparkSQL更简洁。

createTempView、createOrReplaceTempView、spark.sql(“SQL”)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.lagou.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

case class Info(id: String, tags: String)

object SQLDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName)
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
import spark.implicits._

// 准备数据
val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
val rdd: RDD[Info] = spark.sparkContext.makeRDD(arr)
.map { line =>
val fields: Array[String] = line.split("\\s+")
Info(fields(0), fields(1))
}
val ds: Dataset[Info] = spark.createDataset(rdd)
ds.createOrReplaceTempView("t1")
ds.show

// 用SQL处理 - HQL
spark.sql(
"""
|select id, tag
| from t1
| lateral view explode(split(tags, ",")) t2 as tag
|""".stripMargin
).show

// SparkSQL
spark.sql(
"""
|select id, explode(split(tags, ",")) tag
| from t1
|""".stripMargin
).show

spark.close()
}
}

输入与输出

SparkSQL内建支持的数据源包括:Parquet、JSON、CSV、Avro、Images、BinaryFiles(Spark 3.0)。其中Parquet是默认的数据源。

1
2
3
4
5
// 内部使用
DataFrameReader.format(args).option("key", "value").schema(args).load()

// 开发API
SparkSession.read

可用的Option选项参见:https://spark.apache.org/docs/2.4.5/api/java/org/apache/spark/sql/DataFrameReader.html


1
2
3
4
5
6
7
8
9
10
11
12
13
14
val df1 = spark.read.format("parquet").load("data/users.parquet")

// Use Parquet; you can omit format("parquet") if you wish as it's the default
val df2 = spark.read.load("data/users.parquet")

// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("data/people1.csv")

// Use JSON
val df4 = spark.read.format("json")
.load("data/emp.json")
1
2
3
4
5
6
7
8
9
// 内部使用
DataFrameWriter.format(args)
.option(args)
.bucketBy(args)
.partitionBy(args)
.save(path)

// 开发API
DataFrame.write

Parquet文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spark.sql(
"""
|CREATE OR REPLACE TEMPORARY VIEW users
|USING parquet
|OPTIONS (path "data/users.parquet")
|""".stripMargin
)

spark.sql("select * from users").show

df.write.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("data/parquet")

json文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val fileJson = "data/emp.json"
val df6 = spark.read.format("json").load(fileJson)

spark.sql(
"""
|CREATE OR REPLACE TEMPORARY VIEW emp
| USING json
| options(path "data/emp.json")
|""".stripMargin
)

spark.sql("SELECT * FROM emp").show()

spark.sql("SELECT * FROM emp").write
.format("json")
.mode("overwrite")
.save("data/json")

CSV文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val fileCSV = "data/people1.csv"
val df = spark.read.format("csv")
.option("header", "true")
.option("inferschema", "true")
.load(fileCSV)

spark.sql(
"""
|CREATE OR REPLACE TEMPORARY VIEW people
| USING csv
|options(path "data/people1.csv",
| header "true",
| inferschema "true")
|""".stripMargin
)

spark.sql("select * from people")
.write
.format("csv")
.mode("overwrite")
.save("data/csv")

JDBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val jdbcDF = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
//&useUnicode=true
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "lagou_product_info")
.option("user", "hive")
.option("password", "12345678")
.load()

jdbcDF.show()

jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false&characterEncoding=utf8")
.option("user", "hive")
.option("password", "12345678")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "lagou_product_info_back")
.mode("append")
.save

备注:如果有中文注意表的字符集,否则会有乱码

  • SaveMode.ErrorIfExists(默认)。若表存在,则会直接报异常,数据不能存入数据库

  • SaveMode.Append。若表存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据

  • SaveMode.Overwrite。先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据

  • SaveMode.Ignore。若表不存在,则创建表并存入数据;若表存在,直接跳过数据的存储,不会报错

1
2
3
4
5
6
7
8
9
10
-- 创建表
create table lagou_product_info_back as
select * from lagou_product_info;

-- 检查表的字符集
show create table lagou_product_info_back;
show create table lagou_product_info;

-- 修改表的字符集
alter table lagou_product_info_back convert to character set utf8;