DStream转换操作

DStream上的操作与RDD的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:updateStateByKey、transform 以及各种 Window 相关的操作。

Transformation Meaning
map(func) 将源DStream中的每个元素通过一个函数func从而得到新的DStreams
flatMap(func) 和map类似,但是每个输入的项可以被映射为0或更多项
filter(func) 选择源DStream中函数func判为true的记录作为新DStreams
repartition(numPartitions) 通过创建更多或者更少的partition来改变此DStream的并行级别
union(otherStream) 联合源DStreams和其他DStreams来得到新DStream
count() 统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams
reduce(func) 通过函数func(两个参数一个输出)来整合源 DStreams中每个RDD元素得到单元素RDD的DStreams。这个函数需要关联从而可以被并行计算
countByValue() 对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的频率
reduceByKey(func,[numTasks]) 对(K,V)对的DStream调用此函数,返回同样(K,V)的新DStream,新DStream中的对应V为使用reduce函数整合而来。默认情况下,这个操作使用Spark默认数量的并行任务(本地模式为2,集群模式中的数量取决于配置参数spark.default.parallelism)。也可以传入可选的参数numTasks来设置不同数量的任务
join(otherStream,[numTasks]) 两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream
cogroup(otherStream,[numTasks]) 两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams
transform(func) 将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作
updateStateByKey(func) 得到”状态”DStream,其中每个key状态的更新是通过将给定函数用于此key的上一个状态和新值而得到。这个可用于保存每个key值的任意状态数据

备注:

  • 在DStream与RDD上的转换操作非常类似(无状态的操作)

  • DStream有自己特殊的操作(窗口操作、追踪状态变化操作)

  • 在DStream上的转换操作比RDD上的转换操作少

DStream 的转化操作可以分为 无状态(stateless) 和 有状态(stateful) 两种:

  • 无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作,例如 map、filter、reduceByKey 等

  • 有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的数据。有状态转化操作包括:基于滑动窗口的转化操作 或 追踪状态变化的转化操作

无状态转换

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。

常见的无状态转换包括:map、flatMap、filter、repartition、reduceByKey、groupByKey;直接作用在DStream上

重要的转换操作:transform。通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作


这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD。也就是说开发者,可以提供任意一个RDD到RDD的函数,这个函数在数据流每个批次中都被调用,生成一个新的流。

示例:黑名单过滤

1
2
3
4
5
6
7
8
9
10
11
假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据未生效
val arr1 = Array(("spark", true), ("scala", false))

假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操 作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive

结果:"2 spark" 被过滤

方法一:使用外连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

// transform
// ConstantInputDStream 主要用于测试
object BlackListFilter1 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))

// 自定义黑名单数据
val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("world", true))
.map(elem => (elem._1.toLowerCase, elem._2))
val blackListRDD: RDD[(String, Boolean)] = ssc.sparkContext.makeRDD(blackList)

// 创建DStream。使用 ConstantInputDStream 用于测试
val strArray: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
.split("\\s+")
.zipWithIndex
.map { case (word, timestamp) => s"$timestamp $word" }
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

// 流式数据的处理
wordDStream.transform{rdd =>
rdd.map{line => (line.split("\\s+")(1).toLowerCase, line)}
.leftOuterJoin(blackListRDD)
.filter{case (_, (_, rightValue)) => ! rightValue.getOrElse(false)}
.map{case (_, (leftValue, _)) => leftValue}
}.print(20)

// 流式数据的输出

// 启动作业
ssc.start()
ssc.awaitTermination()
}
}

方法二:使用SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

// ConstantInputDStream 主要用于测试
// transform + SQL
object BlackListFilter2 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))

// 自定义黑名单数据
val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("world", true))
.map(elem => (elem._1.toLowerCase, elem._2))
val blackListRDD: RDD[(String, Boolean)] = ssc.sparkContext.makeRDD(blackList)

// 创建DStream。使用 ConstantInputDStream 用于测试
val strArray: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
.split("\\s+")
.zipWithIndex
.map { case (word, timestamp) => s"$timestamp $word" }
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

// 流式数据的处理和输出,可以使用 SQL/DSL
wordDStream.map(line => (line.split("\\s+")(1).toLowerCase, line))
.transform{rdd =>
val spark = SparkSession.builder()
.config(rdd.sparkContext.getConf)
.getOrCreate()

import spark.implicits._
val wordDF: DataFrame = rdd.toDF("word", "line")
val blackListDF: DataFrame = blackListRDD.toDF("word", "flag")
wordDF.join(blackListDF, Seq("word"), "left_outer")
.filter("flag is null or flag = false")
.select("line")
.rdd
}.print(20)

// 启动作业
ssc.start()
ssc.awaitTermination()
}
}

方法三:直接过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 直接过滤数据,效率最高,没有shuffle
object BlackListFilter3 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))

// 自定义黑名单数据
val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("world", true))
.filter(_._2)
.map(_._1.toLowerCase)


// 创建DStream。使用 ConstantInputDStream 用于测试
val strArray: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
.split("\\s+")
.zipWithIndex
.map { case (word, timestamp) => s"$timestamp $word" }
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

// 流式数据的处理和输出
wordDStream.map(line => (line.split("\\s+")(1).toLowerCase, line))
.filter{case (word, _) => ! blackList.contains(word)}
.map(_._2)
.print(20)

// 启动作业
ssc.start()
ssc.awaitTermination()
}
}

有状态转换

有状态的转换主要有两种:窗口操作、状态跟踪操作

窗口操作

Window Operations可以设置窗口大小和滑动窗口间隔来动态的获取当前Streaming的状态。

基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。


基于窗口的操作需要两个参数:

  • 窗口长度(windowDuration)。控制每次计算最近的多少个批次的数据

  • 滑动间隔(slideDuration)。用来控制对新的 DStream 进行计算的间隔

两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。

每秒发送1个数字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.lagou.streaming.basic

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}


object SocketLikeNCWithWindow {
def main(args: Array[String]): Unit = {
val port: Int = 9999

val server = new ServerSocket(port)
val socket: Socket = server.accept()
println("成功连接到本地主机:" + socket.getInetAddress)

var i = 0
// 每秒发送1个数
while(true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
  • 案例一

    • 观察窗口的数据;

    • 观察 batchDuration、windowDuration、slideDuration 三者之间的关系;

    • 使用窗口相关的操作;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package cn.lagou.streaming.basic

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream

    object WindowDemo {
    def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换和输出
    // foreachRDD输出
    // lines.foreachRDD{(rdd, time) =>
    // println(s"rdd = ${rdd.id}; time = $time")
    // rdd.foreach(println)
    // }

    // 窗口操作
    val res1: DStream[String] = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
    res1.print()

    val res2: DStream[String] = lines.window(Seconds(20), Seconds(10))
    res2.print()

    val res3: DStream[Int] = res2.map(_.toInt).reduce(_ + _)
    res3.print()

    val res4: DStream[Int] = lines.map(_.toInt).reduceByWindow(_+_, Seconds(20), Seconds(10))
    res4.print()

    // 启动作业
    ssc.start()
    ssc.awaitTermination()
    }
    }
  • 案例二:热点搜索词实时统计。每隔 10 秒,统计最近20秒的词出现的次数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package cn.lagou.streaming.basic

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream

    object HotWordStats {
    def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    // 设置检查点,保存状态。在生产中目录应该设置到HDFS
    ssc.checkpoint("data/checkpoint")

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换&输出
    // 每隔 10 秒,统计最近20秒的词出现的次数
    // window1 = t1 + t2 + t3
    // window2 = t3 + t4 + t5
    val wordCounts1: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
    .map((_, 1))
    .reduceByKeyAndWindow((x: Int, y: Int) => x+y, Seconds(20), Seconds(10))
    wordCounts1.print()

    // window2 = w1 - t1 - t2 + t4 + t5
    // 需要checkpoint支持
    val wordCounts2: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
    .map((_, 1))
    .reduceByKeyAndWindow(_+_, _-_, Seconds(20), Seconds(10))
    wordCounts2.print()

    // 启动作业
    ssc.start()
    ssc.awaitTermination()
    }
    }

updateStateByKey(状态追踪操作)

UpdateStateByKey的主要功能:

  • 为Streaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是自定义对象;更新函数也可以是自定义的

  • 通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey 的时候为已经存在的key进行state的状态更新

  • 使用 updateStateByKey 时要开启 checkpoint 功能


流式程序启动后计算wordcount的累计值,将每个批次的结果保存到文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object StateTracker1 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("data/checkpoint/")

// 创建DStream
val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

// DStream转换
val pairsDStream: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map((_, 1))

// updateFunc: (Seq[V], Option[S]) => Option[S]
val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (currValues: Seq[Int], prevValues: Option[Int]) => {
val currentSum = currValues.sum
val prevSum: Int = prevValues.getOrElse(0)
Some(currentSum + prevSum)
}

val resultDStream: DStream[(String, Int)] = pairsDStream.updateStateByKey[Int](updateFunc)
resultDStream.cache()

// DStream输出
resultDStream.print()
resultDStream
.saveAsTextFiles("data/output1/")

// 启动作业
ssc.start()
ssc.awaitTermination()
}
}

统计全局的key的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的key的状态。

这样的缺点:如果数据量很大的话,checkpoint 数据会占用较大的存储,而且效率也不高。

mapWithState:也是用于全局统计key的状态。如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。

这样做的好处是,只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object StateTracker2 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("data/checkpoint/")

// 创建DStream
val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

// DStream转换
val pairsDStream: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map((_, 1))

// (KeyType, Option[ValueType], State[StateType]) => MappedType
def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
// 计算value
val sum = one.getOrElse(0) + state.getOption().getOrElse(0)
// 保存状态
state.update(sum)
// 输出值
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream: DStream[(String, Int)] = pairsDStream.mapWithState[Int, (String, Int)](spec)
// 显示快照(显示所有相关的key-value)
.stateSnapshots()
resultDStream.cache()

// DStream输出
resultDStream.print()
resultDStream.saveAsTextFiles("data/output2/")

// 启动作业
ssc.start()
ssc.awaitTermination()
}
}

DStream输出操作

输出操作定义 DStream 的输出操作。

与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。

如果 StreamingContext 中没有设定输出操作,整个流式作业不会启动。

Output Operation Meaning
print() 在运行流程序的Driver上,输出DStream中每一批次数据的最开始10个元素。用于开发和调试
saveAsTextFiles(prefix, [suffix]) 以text文件形式存储 DStream 的内容。每一批次的存储文件名基于参数中的prefix和suffix
saveAsObjectFiles(prefix, [suffix]) 以 Java 对象序列化的方式将Stream中的数据保存为 Sequence Files。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix, [suffix]) 将Stream中的数据保存为 Hadoop files。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"
foreachRDD(func) 最通用的输出操作。将函数 func 应用于DStream 的每一个RDD上

通用的输出操作 foreachRDD,用来对 DStream 中的 RDD 进行任意计算。在foreachRDD中,可以重用 Spark RDD 中所有的 Action 操作。需要注意的:

  • 连接不要定义在 Driver 中

  • 连接定义在 RDD的 foreach 算子中,则遍历 RDD 的每个元素时都创建连接,得不偿失

  • 应该在 RDD的 foreachPartition 中定义连接,每个分区创建一个连接可以考虑使用连接池