SQL语句
总体而言:SparkSQL与HQL兼容;与HQL相比,SparkSQL更简洁。
createTempView、createOrReplaceTempView、spark.sql(“SQL”)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package cn.lagou.sparksql
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession}
case class Info(id: String, tags: String)
object SQLDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getCanonicalName) .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("warn") import spark.implicits._
val arr = Array("1 1,2,3", "2 2,3", "3 1,2") val rdd: RDD[Info] = spark.sparkContext.makeRDD(arr) .map { line => val fields: Array[String] = line.split("\\s+") Info(fields(0), fields(1)) } val ds: Dataset[Info] = spark.createDataset(rdd) ds.createOrReplaceTempView("t1") ds.show
spark.sql( """ |select id, tag | from t1 | lateral view explode(split(tags, ",")) t2 as tag |""".stripMargin ).show
spark.sql( """ |select id, explode(split(tags, ",")) tag | from t1 |""".stripMargin ).show
spark.close() } }
|
输入与输出
SparkSQL内建支持的数据源包括:Parquet、JSON、CSV、Avro、Images、BinaryFiles(Spark 3.0)。其中Parquet是默认的数据源。
1 2 3 4 5
| // 内部使用 DataFrameReader.format(args).option("key", "value").schema(args).load()
// 开发API SparkSession.read
|
可用的Option选项参见:https://spark.apache.org/docs/2.4.5/api/java/org/apache/spark/sql/DataFrameReader.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| val df1 = spark.read.format("parquet").load("data/users.parquet")
// Use Parquet; you can omit format("parquet") if you wish as it's the default val df2 = spark.read.load("data/users.parquet")
// Use CSV val df3 = spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load("data/people1.csv")
// Use JSON val df4 = spark.read.format("json") .load("data/emp.json")
|
1 2 3 4 5 6 7 8 9
| // 内部使用 DataFrameWriter.format(args) .option(args) .bucketBy(args) .partitionBy(args) .save(path)
// 开发API DataFrame.write
|
Parquet文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spark.sql( """ |CREATE OR REPLACE TEMPORARY VIEW users |USING parquet |OPTIONS (path "data/users.parquet") |""".stripMargin )
spark.sql("select * from users").show
df.write.format("parquet") .mode("overwrite") .option("compression", "snappy") .save("data/parquet")
|
json文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| val fileJson = "data/emp.json" val df6 = spark.read.format("json").load(fileJson)
spark.sql( """ |CREATE OR REPLACE TEMPORARY VIEW emp | USING json | options(path "data/emp.json") |""".stripMargin )
spark.sql("SELECT * FROM emp").show()
spark.sql("SELECT * FROM emp").write .format("json") .mode("overwrite") .save("data/json")
|
CSV文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| val fileCSV = "data/people1.csv" val df = spark.read.format("csv") .option("header", "true") .option("inferschema", "true") .load(fileCSV)
spark.sql( """ |CREATE OR REPLACE TEMPORARY VIEW people | USING csv |options(path "data/people1.csv", | header "true", | inferschema "true") |""".stripMargin )
spark.sql("select * from people") .write .format("csv") .mode("overwrite") .save("data/csv")
|
JDBC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| val jdbcDF = spark .read .format("jdbc") .option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false") //&useUnicode=true .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "lagou_product_info") .option("user", "hive") .option("password", "12345678") .load()
jdbcDF.show()
jdbcDF.write .format("jdbc") .option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false&characterEncoding=utf8") .option("user", "hive") .option("password", "12345678") .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "lagou_product_info_back") .mode("append") .save
|
备注:如果有中文注意表的字符集,否则会有乱码
-
SaveMode.ErrorIfExists(默认)。若表存在,则会直接报异常,数据不能存入数据库
-
SaveMode.Append。若表存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据
-
SaveMode.Overwrite。先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据
-
SaveMode.Ignore。若表不存在,则创建表并存入数据;若表存在,直接跳过数据的存储,不会报错
1 2 3 4 5 6 7 8 9 10
| -- 创建表 create table lagou_product_info_back as select * from lagou_product_info;
-- 检查表的字符集 show create table lagou_product_info_back; show create table lagou_product_info;
-- 修改表的字符集 alter table lagou_product_info_back convert to character set utf8;
|