事务

事务场景

    1. 如producer发的多条消息组成⼀个事务这些消息需要对consumer同时可⻅或者同时不可⻅ 。
    1. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在⼀个事务⾥⾯,这就形成了⼀个典型的分布式事务。
    1. kafka的应⽤场景经常是应⽤先消费⼀个topic,然后做处理再发到另⼀个topic,这个consume-transform-produce过程需要放到⼀个事务⾥⾯,⽐如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交。
    1. producer或者producer所在的应⽤可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务。
    1. 在⼀个原⼦操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引⼊的场景,最后⼀种没⽤。
      1. 只有Producer⽣产消息。
      1. 消费消息和⽣产消息并存,这个是事务场景中最常⽤的情况,就是我们常说的 consume-transform-produce 模式。
      1. 只有consumer消费消息,这种操作其实没有什么意义,跟使⽤⼿动提交效果⼀样,⽽且也不是事务属性引⼊的⽬的,所以⼀般不会使⽤这种情况。

关键概念和推导

  1. 因为producer发送消息可能是分布式事务,所以引⼊了常⽤的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引⼊的Group Coordinator在选举上类似。

  2. 事务管理中事务⽇志是必不可少的,kafka使⽤⼀个内部topic来保存事务⽇志,这个设计和之前使⽤内部topic保存偏移量的设计保持⼀致。事务⽇志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务⽇志只⽤保存最近的事务状态。 __transaction_state

  3. 因为事务存在commit和abort两种操作,⽽客户端⼜有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。

  4. producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有⼀个唯⼀标识符来进⾏关联,这个就是TransactionalId,⼀个producer挂了,另⼀个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。kafka⽬前没有引⼊全局序,所以也没有transaction id,这个TransactionalId是⽤户提前配置的。

  5. TransactionalId能关联producer,也需要避免两个使⽤相同TransactionalId的producer同时存在,所以引⼊了producer epoch来保证对应⼀个TransactionalId只有⼀个活跃的producer

事务语义

  • 多分区原⼦写⼊

    事务能够保证Kafka topic下每个分区的原⼦写⼊。事务中所有的消息都将被成功写⼊或者丢弃。

    ⾸先,我们来考虑⼀下原⼦ 读取-处理-写⼊ 周期是什么意思。简⽽⾔之,这意味着如果某个应⽤程序在某个topictp0的偏移量X处读取到了消息A,并且在对消息A进⾏了⼀些处理(如B = F(A))之后将消息B写⼊topic tp1,则只有当消息A和B被认为被成功地消费并⼀起发布,或者完全不发布时,整个读取过程写⼊操作是原⼦的。

    现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写⼊⼀个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。

    由于offset commit只是对Kafkatopic的另⼀次写⼊,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原⼦写⼊也启⽤原⼦ 读取-处理-写⼊ 循环:提交偏移量X到offset topic和消息B到tp1的写⼊将是单个事务的⼀部分,所以整个步骤都是原⼦的。

  • 粉碎“僵⼫实例”

    我们通过为每个事务Producer分配⼀个称为transactional.id的唯⼀标识符来解决僵⼫实例的问题。在进程重新启动时能够识别相同的Producer实例。

    API要求事务性Producer的第⼀个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker⽤给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了⼀个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。

    ⼀旦epoch被触发,任何具有相同的transactional.id和旧的epoch的⽣产者被视为僵⼫,Kafka拒绝来⾃这些⽣产者的后续事务性写⼊。

    简⽽⾔之:Kafka可以保证Consumer最终只能消费⾮事务性消息或已提交事务性消息。它将保留来⾃未完成事务的消息,并过滤掉已中⽌事务的消息。

  • 事务消息定义

    ⽣产者可以显式地发起事务会话,在这些会话中发送(事务)消息,并提交或中⽌事务。有如下要求:

    1. 原⼦性:消费者的应⽤程序不应暴露于未提交事务的消息中。

    2. 持久性:Broker不能丢失任何已提交的事务。

    3. 排序:事务消费者应在每个分区中以原始顺序查看事务消息。

    4. 交织:每个分区都应该能够接收来⾃事务性⽣产者和⾮事务⽣产者的消息。

    5. 事务中不应有重复的消息。

    如果允许事务性和⾮事务性消息的交织,则⾮事务性和事务性消息的相对顺序将基于附加(对于⾮事务性消息)和最终提交(对于事务性消息)的相对顺序。


    在上图中,分区p0和p1接收事务X1和X2的消息,以及⾮事务性消息。时间线是消息到达Broker的时间。由于⾸先提交了X2,所以每个分区都将在X1之前公开来⾃X2的消息。由于⾮事务性消息在X1和X2的提交之前到达,因此这些消息将在来⾃任⼀事务的消息之前公开。

事务配置

    1. 创建消费者代码,需要:

将配置中的⾃动提交属性(auto.commit)进⾏关闭

⽽且在代码⾥⾯也不能使⽤⼿动提交commitSync( )或者commitAsync( )

设置isolation.level:READ_COMMITTED或READ_UNCOMMITTED

    1. 创建⽣成者,代码如下,需要:

配置transactional.id属性

配置enable.idempotence属性

事务概览

⽣产者将表示事务开始/结束/中⽌状态的事务控制消息发送给使⽤多阶段协议管理事务的⾼可⽤事务协调器。⽣产者将事务控制记录(开始/结束/中⽌)发送到事务协调器,并将事务的消息直接发送到⽬标数据分区。消费者需要了解事务并缓冲每个待处理的事务,直到它们到达其相应的结束(提交/中⽌)记录为⽌。

  • 事务组

    事务组⽤于映射到特定的事务协调器(基于⽇志分区数字的哈希)。该组中的⽣产者需要配置为该组事务⽣产者。由于来⾃这些⽣产者的所有事务都通过此协调器进⾏,因此我们可以在这些事务⽣产者之间实现严格的有序。

  • 事务组中的⽣产者

    事务⽣产者需要两个新参数:⽣产者ID和⽣产组。

    需要将⽣产者的输⼊状态与上⼀个已提交的事务相关联。这使事务⽣产者能够重试事务(通过为该事务重新创建输⼊状态;在我们的⽤例中通常是偏移量的向量)。

    可以使⽤消费者偏移量管理机制来管理这些状态。消费者偏移量管理器将每个键( consumergroup-topic-partition )与该分区的最后⼀个检查点偏移量和元数据相关联。在事务⽣产者中,我们保存消费者的偏移量,该偏移量与事务的提交点关联。此偏移提交记录(在 __consumer_offsets 主题中)应作为事务的⼀部分写⼊。即,存储消费组偏移量的 __consumer_offsets 主题分区将需要参与事务。因此,假定⽣产者在事务中间失败(事务协调器随后到期);当⽣产者恢复时,它可以发出偏移量获取请求,以恢复与最后提交的事务相关联的输⼊偏移量,并从该点恢复事务处理。

    为了⽀持此功能,我们需要对偏移量管理器和压缩的 __consumer_offsets 主题进⾏⼀些增强。

    ⾸先,压缩的主题现在还将包含事务控制记录。我们将需要为这些控制记录提出剔除策略。

    其次,偏移量管理器需要具有事务意识;特别是,如果组与待处理的事务相关联,则偏移量提取请求应返回错误。

  • 事务组的事务协调器

    事务协调器是 __transaction_state 主题特定分区的Leader分区所在的Broker。它负责初始化、提交以及回滚事务。事务协调器在内存管理如下的状态:

    • 对应正在处理的事务的第⼀个消息的HW。事务协调器周期性地将HW写到ZK。

    • 事务控制⽇志中存储对应于⽇志HW的所有正在处理的事务:

      • 事务消息主题分区的列表。

      • 事务的超时时间。

      • 与事务关联的Producer ID。

    需要确保⽆论是什么样的保留策略(⽇志分区的删除还是压缩),都不能删除包含事务HW的⽇志分段。

  • Leader brokers(事务数据所在分区的Broker)

  • 事务的消费者

事务流程


  • 初始阶段(图中步骤1)

    1. Producer:计算哪个Broker作为事务协调器。

    2. Producer:向事务协调器发送BeginTransaction(producerId, generation, partitions… )请求,当然也可以发送另⼀个包含事务过期时间的。如果⽣产者需要将消费者状态作为事务的⼀部分提交事务,则需要在BeginTransaction中包含对应的 __consumer_offsets 主题分区信息。

    3. Broker:⽣成事务ID

    4. Coordinator:向事务协调主题追加BEGIN(TxId, producerId, generation, partitions…)消息,然后发送响应给⽣产者。

    5. Producer:读取响应(包含了事务ID:TxId)

    6. Coordinator (and followers):在内存更新当前事务的待确认事务状态和数据分区信息。

  • 发送阶段 (图中步骤2)

    Producer:发送事务消息给主题Leader分区所在的Broker。每个消息需要包含TxId和TxCtl字段。

    TxCtl仅⽤于标记事务的最终状态(提交还是中⽌)。⽣产者请求也封装了⽣产者ID,但是不追加到⽇志中。

  • 结束阶段 (⽣产者准备提交事务) (图中步骤3、4、5。)

      1. Producer:发送OffsetCommitRequest请求提交与事务结束状态关联的输⼊状态(如下⼀个事务输⼊从哪⼉开始)
      1. Producer:发送CommitTransaction(TxId, producerId, generation)请求给事务协调器并等待响应。(如果响应中没有错误信息,表示将提交事务)
      1. Coordinator:向事务控制主题追加PREPARE_COMMIT(TxId)请求并向⽣产者发送响应。
      1. Coordinator:向事务涉及到的每个Leader分区(事务的业务数据的⽬标主题)的Broker发送⼀个CommitTransaction(TxId, partitions…)请求。
      1. 事务业务数据的⽬标主题相关Leader分区Broker:
        1. 如果是⾮ __consumer_offsets 主题的Leader分区:⼀收到CommitTransaction(TxId, partition1, partition2, …)请求就会向对应的分区Broker发送空(null)消息(没有key/value)并给该消息设置TxId和TxCtl(设置为COMMITTED)字段。Leader分区的Broker给协调器发送响应。
        1. 如果是 __consumer_offsets 主题的Leader分区:追加消息,该消息的key是 G-LAST-COMMIT,value就是 TxId 的值。同时也应该给该消息设置TxId和TxCtl字段。Broker向协调器发送响应。
      1. Coordinator:向事务控制主题发送COMMITTED(TxId)请求。 __transaction_state
      1. Coordinator (and followers):尝试更新HW。

事务的中⽌

当事务⽣产者发送业务消息的时候如果发⽣异常,可以中⽌该事务。如果事务提交超时,事务协调器也会中⽌当前事务。

Producer:向事务协调器发送AbortTransaction(TxId)请求并等待响应。(⼀个没有异常的响应表示事务将会中⽌)。

Coordinator:向事务控制主题追加PREPARE_ABORT(TxId)消息,然后向⽣产者发送响应。

Coordinator:向事务业务数据的⽬标主题的每个涉及到的Leader分区Broker发送AbortTransaction(TxId,partitions…)请求。(收到Leader分区Broker响应后,事务协调器中⽌动作跟上⾯的提交类似。)

事务流程的失败

⽣产者发送BeginTransaction(TxId):的时候超时或响应中包含异常,⽣产者使⽤相同的TxId重试。

⽣产者发送数据时的Broker错误:⽣产者应中⽌(然后重做)事务(使⽤新的TxId)。如果⽣产者没有中⽌事务,则协调器将在事务超时后中⽌事务。仅在可能已将请求数据附加并复制到Follower的错误的情况下才需要重做事务。例如,⽣产者请求超时将需要重做,⽽NotLeaderForPartitionException不需要重做。

⽣产者发送CommitTransaction(TxId)请求超时或响应中包含异常,⽣产者使⽤相同的TxId重试事务。此时需要幂等性。

主题的压缩

压缩主题在压缩过程中会丢弃具有相同键的早期记录。如果这些记录是事务的⼀部分,这合法吗? 这可能有点怪异,但可能不会太有害,因为在主题中使⽤压缩策略的理由是保留关键数据的最新更新。

如果该应⽤程序正在(例如)更新某些表,并且事务中的消息对应于不同的键,则这种情况可能导致数据库视图不⼀致。

事务相关配置

    1. Broker configs
    配置项 说明
    transactional.id.timeout.ms 在ms中,事务协调器在⽣产者TransactionalId提前过期之前等待的最⻓时间,并且没有从该⽣产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周⼀次的⽣产者作业维护它们的id
    max.transaction.timeout.ms 事务允许的最⼤超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防⽌客户机超时过⼤,从⽽导致⽤户⽆法从事务中包含的主题读取内容。默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。
    transaction.state.log.replication.factor 事务状态topic的副本数量。默认值:3
    transaction.state.log.num.partitions 事务状态主题的分区数。默认值:50
    transaction.state.log.min.isr 事务状态主题的每个分区ISR最⼩数量。默认值:2
    transaction.state.log.segment.bytes 事务状态主题的segment⼤⼩。默认值:104857600字节
    1. Producer configs
    配置项 说明
    enable.idempotence 开启幂等
    transaction.timeout.ms 事务超时时间。事务协调器在主动中⽌正在进⾏的事务之前等待⽣产者更新事务状态的最⻓时间。
    这个配置值将与InitPidRequest⼀起发送到事务协调器。如果该值⼤于 max.transaction.timeout。在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。
    默认是60000。这使得交易不会阻塞下游消费超过⼀分钟,这在实时应⽤程序中通常是允许的。
    transactional.id ⽤于事务性交付的TransactionalId。这⽀持跨多个⽣产者会话的可靠性语义,因为它允许客户端确保使⽤相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则⽣产者仅限于幂等交付。
    1. Consumer configs
    配置项 说明
    isolation.level - read_uncommitted:以偏移顺序使⽤已提交和未提交的消息。
    - read_committed:仅以偏移量顺序使⽤⾮事务性消息或已提交事务性消息。
    为了维护偏移排序,这个设置意味着我们必须在使⽤者中缓冲消息,直到看到给定事务中的所有消息。

幂等性

Kafka在引⼊幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:


⽣产中,会出现各种不确定的因素,⽐如在Producer在发送给Broker的时候出现⽹络异常。⽐如以下这种异常情况的出现:


上图这种情况,当Producer第⼀次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(⽐如⽹络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。

概念

保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的⼀致性。

所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。

⽐如,银⾏转账,如果失败,需要重试。不管重试多少次,都要保证最终结果⼀定是⼀致的。

幂等性实现

添加唯⼀ID,类似于数据库的主键,⽤于唯⼀标记⼀个消息。

Kafka为了实现幂等性,它在底层设计架构中引⼊了ProducerID和SequenceNumber。

ProducerID:在每个新的Producer初始化时,会被分配⼀个唯⼀的ProducerID,这个ProducerID对客户端使⽤者是不可⻅的。

SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应⼀个从0开始单调递增的SequenceNumber值。

同样,这是⼀种理想状态下的发送流程。实际情况下,会有很多不确定的因素,⽐如Broker在发送Ack信号给Producer时出现⽹络异常,导致发送失败。异常情况如下图所示:


当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发⽣异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引⼊了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,⽽之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有⼀条(x2,y2),不会出现重复发送的情况。

在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有⼀个maybeWaitForPid()⽅法,⽤来⽣成⼀个ProducerID,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void maybeWaitForPid() {
if (transactionState == null) return;
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}

事务操作

在Kafka事务中,⼀个原⼦性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer⽣产消息,这种场景需要事务的介⼊;

  • 消费消息和⽣产消息并存,⽐如Consumer&Producer模式,这种场景是⼀般Kafka项⽬中⽐较常⻅的模式,需要事务介⼊;

  • 只有Consumer消费消息,这种操作在实际项⽬中意义不⼤,和⼿动Commit Offsets的结果⼀样,⽽且这种场景不是事务的引⼊⽬的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();

// 开启事务
void beginTransaction() throws ProducerFencedException;

// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction() throws ProducerFencedException;

// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
  • 案例1:单个Producer,使⽤事务保证消息的仅⼀次发送:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.HashMap;
    import java.util.Map;

    public class MyTransactionalProducer {

    public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.121:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 提供生产者client.id
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
    // 设置事务ID
    configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");
    // 需要ISR全体确认消息
    configs.put(ProducerConfig.ACKS_CONFIG, "all");

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

    // 初始化事务
    producer.initTransactions();
    try {
    // 开启事务
    producer.beginTransaction();
    // 发送事务消息
    producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));
    producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));
    producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));
    int i = 1 / 0;
    // 提交事务
    producer.commitTransaction();
    } catch (Exception e) {
    e.printStackTrace();
    // 事务回滚
    producer.abortTransaction();
    } finally {
    // 关闭生产者
    producer.close();
    }
    }
    }
  • 案例2:在 消费-转换-⽣产 模式,使⽤事务保证仅⼀次发送。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;

    public class MyTransactional {

    public static KafkaProducer<String, String> getProducer() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.121:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 设置client.id
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
    // 设置事务id
    configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
    // 需要所有的ISR副本确认
    configs.put(ProducerConfig.ACKS_CONFIG, "all");
    // 启用幂等性
    configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

    return producer;
    }

    public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // 设置消费组ID
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
    // 不启用消费者偏移量的自动确认,也不要手动确认
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // 只读取已提交的消息
    // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);

    return consumer;
    }


    public static void main(String[] args) {

    String consumerGroupId = "consumer_grp_id_101";
    KafkaProducer<String, String> producer = getProducer();
    KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);

    // 事务的初始化
    producer.initTransactions();
    //订阅主题
    consumer.subscribe(Collections.singleton("tp_tx_01"));
    final ConsumerRecords<String, String> records = consumer.poll(1_000);
    // 开启事务
    producer.beginTransaction();
    try {
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
    System.out.println(record);
    producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));
    offsets.put(
    new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下一条要消费的消息
    }

    // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    // int i = 1 / 0;
    // 提交事务
    producer.commitTransaction();
    } catch (Exception e) {
    e.printStackTrace();
    // 回滚事务
    producer.abortTransaction();
    } finally {
    // 关闭资源
    producer.close();
    consumer.close();
    }
    }
    }