官网:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html

针对不同的spark、kafka版本,集成处理数据的方式分为两种:Receiver Approach和Direct Approach,不同集成版本处理方式的支持,可参考下图:



Kafka-08 接口

Receiver based Approach

基于 Receiver 的方式使用 Kafka 旧版消费者高阶API实现。

对于所有的 Receiver,通过 Kafka 接收的数据被存储于 Spark 的 Executors上,底层是写入BlockManager中,默认200ms生成一个block(spark.streaming.blockInterval)。然后由 Spark Streaming 提交的 job 构 建BlockRDD,最终以 Spark Core任务的形式运行。对应 Receiver方式,有以下几点需要注意:

  • Receiver 作为一个常驻线程调度到 Executor上运行,占用一个cpu

  • Receiver 个数由KafkaUtils.createStream调用次数决定,一次一个 Receiver

  • kafka中的topic分区并不能关联产生在spark streaming中的rdd分区。增加在KafkaUtils.createStream()中的指定的topic分区数,仅仅增加了单个receiver消费的topic的线程数,它不会增加处理数据中的并行的spark的数量。【 即:topicMap[topic,num_threads]中,value对应的数值是每个topic对应的消费线程数】

  • receiver默认200ms生成一个block,可根据数据量大小调整block生成周期。一个block对应RDD一个分区。

  • receiver接收的数据会放入到BlockManager,每个 Executor 都会有一个BlockManager实例,由于数据本地性,那些存在 Receiver 的 Executor 会被调度执行更多的 Task,就会导致某些executor比较空闲

  • 默认情况下,Receiver是可能丢失数据的。可以通过设置spark.streaming.receiver.writeAheadLog.enable为true开启预写日志机制,将数据先写入一个可靠地分布式文件系统(如HDFS),确保数据不丢失,但会损失一定性能


Kafka-08 接口(Receiver方式):

  • Offset保存在ZK中,系统管理

  • 对应Kafka的版本 0.8.2.1+

  • 接口底层实现使用 Kafka 旧版消费者高阶API

  • DStream底层实现为BlockRDD


Kafka-08 接口(Receiver with WAL):

  • 增强了故障恢复的能力

  • 接收的数据与Dirver的元数据保存到HDFS

  • 增加了流式应用处理的延迟

Direct Approach

Direct Approach是 Spark Streaming不使用Receiver集成kafka的方式,在企业生产环境中使用较多。相较于Receiver,有以下特点:

  • 不使用 Receiver。减少不必要的CPU占用;减少了 Receiver接收数据写入BlockManager,然后运行时再通过blockId、网络传输、磁盘读取等来获取数据的整个过程,提升了效率;无需WAL,进一步减少磁盘IO;

  • Direct方式生的RDD是KafkaRDD,它的分区数与 Kafka 分区数保持一致,便于把控并行度

    注意:在 Shuffle 或 Repartition 操作后生成的RDD,这种对应关系会失效

  • 可以手动维护offset,实现 Exactly Once 语义


Kafka-010 接口

Spark Streaming与kafka 0.10的整合,和0.8版本的 Direct 方式很像。Kafka的分区和Spark的RDD分区是一一对应的,可以获取 offsets 和元数据,API 使用起来没有显著的区别。

添加依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

不要手动添加 org.apache.kafka 相关的依赖,如kafka-clients。spark-streamingkafka-0-10已经包含相关的依赖了,不同的版本会有不同程度的不兼容。

使用kafka010接口从 Kafka 中获取数据:

  • Kafka集群

  • kafka生产者发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.lagou.streaming.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

object KafkaProducer {
def main(args: Array[String]): Unit = {
// 定义 kafka 参数
val brokers = "linux121:9092,linux122:9092,linux123:9092"
val topic1 = "topicB"
val prop = new Properties()

prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

// KafkaProducer
val producer = new KafkaProducer[String, String](prop)

for (i <- 1 to 1000000){
val msg1 = new ProducerRecord[String, String](topic1, i.toString, i.toString)
// 发送消息
producer.send(msg1)
println(s"i = $i")
Thread.sleep(100)
}

producer.close()
}
}
  • Spark Streaming程序接收数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package cn.lagou.streaming.kafka

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDStream1 {
def main(args: Array[String]): Unit = {
// 初始化
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))

// 定义kafka相关参数
val groupId: String = "mygroup01"
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
val topics: Array[String] = Array("topicB")

// 从 kafka 中获取数据
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// DStream输出
dstream.foreachRDD{(rdd, time) =>
if (!rdd.isEmpty()) {
println(s"*********** rdd.count = ${rdd.count()}; time = $time ***********")
}
}

ssc.start()
ssc.awaitTermination()
}

def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}

}

LocationStrategies(本地策略)

  • LocationStrategies.PreferBrokers:如果 Executor 在 kafka 集群中的某些节点上,可以使用这种策略。此时Executor 中的数据会来自当前broker节点

  • LocationStrategies.PreferConsistent:大多数情况下使用的策略,将Kafka分区均匀的分布在Spark集群的 Executor上

  • LocationStrategies.PreferFixed:如果节点之间的分区有明显的分布不均,使用这种策略。通过一个map指定将 topic 分区分布在哪些节点中

ConsumerStrategies(消费策略)

  • ConsumerStrategies.Subscribe,用来订阅一组固定topic

  • ConsumerStrategies.SubscribePattern,使用正则来指定感兴趣的topic

  • ConsumerStrategies.Assign,指定固定分区的集合

这三种策略都有重载构造函数,允许指定特定分区的起始偏移量;使用 Subscribe 或 SubscribePattern 在运行时能实现分区自动发现。

Offset 管理

Spark Streaming集成Kafka,允许从Kafka中读取一个或者多个 topic 的数据。一个Kafka Topic包含一个或多个分区,每个分区中的消息顺序存储,并使用 offset 来标记消息的位置。开发者可以在 Spark Streaming 应用中通过 offset 来控制数据的读取位置。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的。如果在应用停止或报错退出之前没有将 offset 持久化保存,该信息就会丢失,那么Spark Streaming就没有办法从上次停止或报错的位置继续消费Kafka中的消息。

获取偏移量(Obtaining Offsets)

Spark Streaming与kafka整合时,允许获取其消费的 offset ,具体方法如下:

1
2
3
4
5
6
7
8
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}

注意:对HasOffsetRanges的类型转换只有在对 createDirectStream 调用的第一个方法中完成时才会成功,而不是在随后的方法链中。RDD分区和Kafka分区之间的对应关系在 shuffle 或 重分区后会丧失,如reduceByKey 或 window。

存储偏移量(Storing Offsets)

在Streaming程序失败的情况下,Kafka交付语义取决于如何以及何时存储偏移量。Spark输出操作的语义为 at-least-once。

如果要实现EOS语义(Exactly Once Semantics),必须在幂等的输出之后存储偏移量或者将存储偏移量与输出放在一个事务中。可以按照增加可靠性(和代码复杂度)的顺序使用以下选项来存储偏移量:

  • Checkpoint

    Checkpoint是对Spark Streaming运行过程中的元数据和每RDDs的数据状态保存到一个持久化系统中,当然这里面也包含了offset,一般是HDFS、S3,如果应用程序或集群挂了,可以迅速恢复。

    如果Streaming程序的代码变了,重新打包执行就会出现反序列化异常的问题。

    这是因为Checkpoint首次持久化时会将整个 jar 包序列化,以便重启时恢复。重新打包之后,新旧代码逻辑不同,就会报错或仍然执行旧版代码。

    要解决这个问题,只能将HDFS上的checkpoint文件删除,但这样也会同时删除Kafka 的offset信息。

  • Kafka

    默认情况下,消费者定期自动提交偏移量,它将偏移量存储在一个特殊的Kafka主题中(__consumer_offsets)。但在某些情况下,这将导致问题,因为消息可能已经被消费者从Kafka拉去出来,但是还没被处理。

    可以将 enable.auto.commit 设置为 false ,在 Streaming 程序输出结果之后,手动提交偏移到kafka。

    与检查点相比,使用Kafka保存偏移量的优点是无论应用程序代码如何更改,偏移量仍然有效。

    1
    2
    3
    4
    stream.foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 在输出操作完成之后,手工提交偏移量;此时将偏移量提交到 Kafka 的消息队列中 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

    与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,转换到CanCommitOffsets才会成功,而不是在转换之后。commitAsync调用是线程安全的,但必须在输出之后执行。

  • 自定义存储

    • Offsets可以通过多种方式来管理,但是一般来说遵循下面的步骤: 在 DStream 初始化的时候,需要指定每个分区的offset用于从指定位置读取数据

    • 读取并处理消息

    • 处理完之后存储结果数据

    • 用虚线圈存储和提交offset,强调用户可能会执行一系列操作来满足他们更加严格的语义要求。这包括幂等操作和通过原子操作的方式存储offset

    • 将 offsets 保存在外部持久化数据库如 HBase、Kafka、HDFS、ZooKeeper、Redis、MySQL … …


    可以将 Offsets 存储到HDFS中,但这并不是一个好的方案。因为HDFS延迟有点高,此外将每批次数据的offset存储到HDFS中还会带来小文件问题;

    可以将 Offset 存储到保存ZK中,但是将ZK作为存储用,也并不是一个明智的选择,同时ZK也不适合频繁的读写操作;

Redis管理的Offset

要想将Offset保存到外部存储中,关键要实现以下几个功能:

  • Streaming程序启动时,从外部存储获取保存的Offsets(执行一次)

  • 在foreachRDD中,每个批次数据处理之后,更新外部存储的offsets(多次执行)

  • 案例一:使用自定义的offsets,从kafka读数据;处理完数据后打印offsets

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    package cn.lagou.streaming.kafka

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object KafkaDStream2 {
    def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("FileDStream").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 定义kafka相关参数
    val groupId: String = "mygroup01"
    val topics: Array[String] = Array("topicB")
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
    val offsets: Map[TopicPartition, Long] = Map(
    new TopicPartition("topicB", 0) -> 1000,
    new TopicPartition("topicB", 1) -> 1000,
    new TopicPartition("topicB", 2) -> 1000
    )

    // 创建DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )

    // DStream转换&输出
    dstream.foreachRDD{ (rdd, time) =>
    // 处理消息
    println(s"*********** rdd.count = ${rdd.count()}; time = $time *************")

    // 显示offsets的信息
    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd.foreachPartition{ iter =>
    val range: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${range.topic} ${range.partition} ${range.fromOffset} ${range.untilOffset} ")
    }
    }

    // 启动作业
    ssc.start()
    ssc.awaitTermination()
    }

    def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
    Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.GROUP_ID_CONFIG -> groupid,
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    // ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
    }
    }
  • 案例二:根据 key 从 Redis 获取offsets,根据该offsets从kafka读数据;处理完数据后将offsets保存到 Redis

    Redis管理的Offsets:

    1
    2
    3
    4
    5
    6
    7
    8
    1、数据结构选择:Hash;key、field、value
    Key:kafka:topic:TopicName:groupid
    Field:partition
    Value:offset

    2、从 Redis 中获取保存的offsets

    3、消费数据后将offsets保存到redis

    引入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
    </dependency>

    主程序(从kafka获取数据,使用 Redis 保存offsets)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package cn.lagou.streaming.kafka

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, TaskContext}

    object KafkaDStream3 {
    def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 定义kafka相关参数
    val groupId: String = "mygroup01"
    val topics: Array[String] = Array("topicB")
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)

    // 从 Redis中获取offsets
    val offsets: Map[TopicPartition, Long] = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupId)
    offsets.foreach(println)

    // 创建DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )

    // DStream转换&输出
    dstream.foreachRDD{ (rdd, time) =>
    if (! rdd.isEmpty()) {
    // 处理消息
    println(s"*********** rdd.count = ${rdd.count()}; time = $time *************")

    // 将offsets信息打印到控制台
    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd.foreachPartition{ iter =>
    val range: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${range.topic} ${range.partition} ${range.fromOffset} ${range.untilOffset} ")
    }

    // 将offsets保存到Redis
    OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
    }
    }

    // 启动作业
    ssc.start()
    ssc.awaitTermination()
    }

    def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
    Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.GROUP_ID_CONFIG -> groupid,
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    // ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
    }
    }

    工具类(Redis读取/保存offsets)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    package cn.lagou.streaming.kafka

    import java.util

    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.streaming.kafka010.OffsetRange
    import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

    import scala.collection.mutable

    object OffsetsWithRedisUtils {
    // 定义Redis参数
    private val redisHost = "linux123"
    private val redisPort = 6379

    // 获取Redis的连接
    private val config = new JedisPoolConfig
    // 最大空闲数
    config.setMaxIdle(5)
    // 最大连接数
    config.setMaxTotal(10)

    private val pool = new JedisPool(config, redisHost, redisPort, 10000)
    private def getRedisConnection: Jedis = pool.getResource

    private val topicPrefix = "kafka:topic"

    // Key:kafka:topic:TopicName:groupid
    private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"

    // 根据 key 获取offsets
    def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection

    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
    val key = getKey(topic, groupId)

    import scala.collection.JavaConverters._

    jedis.hgetAll(key)
    .asScala
    .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
    }

    // 归还资源
    jedis.close()

    offsets.flatten.toMap
    }

    // 将offsets保存到Redis中
    def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
    // 获取连接
    val jedis: Jedis = getRedisConnection

    // 组织数据
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
    .groupBy(_._1)
    .foreach{case (topic, buffer) =>
    val key: String = getKey(topic, groupId)

    import scala.collection.JavaConverters._
    val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava

    // 保存数据
    jedis.hmset(key, maps)
    }

    jedis.close()
    }

    }