消息的发送与接收


⽣产者主要的对象有: KafkaProducer , ProducerRecord 。

其中 KafkaProducer 是⽤于发送消息的类, ProducerRecord 类⽤于封装Kafka的消息。

KafkaProducer 的创建需要指定的参数和含义:

参数 说明
bootstrap.servers 配置⽣产者如何与broker建⽴连接。该参数设置的是初始化参数。如果⽣产者需要连接的是Kafka集群,则这⾥配置集群中⼏个broker的地址,⽽不是全部,当⽣产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
key.serializer 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。
value.serializer 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。
acks 默认值:all。
acks=0:⽣产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
该情形不能保证broker是否真的收到了消息,retries配置也不会⽣效。发送的消息的返回的消息偏移量永远是-1。
acks=1: 表示消息只需要写到主分区即可,然后就响应客户端,⽽不等待副本分区的确认。
在该情形下,如果主分区收到消息确认之后就宕机了,⽽副本分区还没来得及同步该消息,则该消息丢失。
acks=all: ⾸领分区会等待所有的ISR副本分区确认记录。
该处理保证了只要有⼀个ISR副本分区存活,消息就不会丢失。
这是Kafka最强的可靠性保证,等效于 acks=-1
retries retries重试次数,当消息发送出现错误的时候,系统会重发消息。
跟客户端收到错误时重发⼀样。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。我们后⾯的内容会介绍到。

消费者⽣产消息后,需要broker端的确认,可以同步确认,也可以异步确认。

同步确认效率低,异步确认效率⾼,但是需要设置回调对象。

    1. 创建maven项目,pom文件导入kafka依赖
    1
    2
    3
    4
    5
    6
    7
    8
    <dependencies>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <!-- 高版本兼容低版本,我们使用和broker一致的版本 -->
    <version>1.0.2</version>
    </dependency>
    </dependencies>
    1. ⽣产者,创建MyProducer1类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.internals.RecordHeader;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;

    public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

    Map<String, Object> configs = new HashMap<>();
    // 指定初始连接用到的broker地址
    configs.put("bootstrap.servers", "192.168.92.121:9092");
    // 指定key的序列化类
    configs.put("key.serializer", IntegerSerializer.class);
    // 指定value的序列化类
    configs.put("value.serializer", StringSerializer.class);

    // configs.put("acks", "all");
    // configs.put("reties", "3");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

    // 用于设置用户自定义的消息头字段
    List<Header> headers = new ArrayList<>();
    headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

    ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
    "topic_1",
    0,
    0,
    "hello lagou 0",
    headers
    );

    // 消息的同步确认
    // final Future<RecordMetadata> future = producer.send(record);
    // final RecordMetadata metadata = future.get();
    // System.out.println("消息的主题:" + metadata.topic());
    // System.out.println("消息的分区号:" + metadata.partition());
    // System.out.println("消息的偏移量:" + metadata.offset());

    // 消息的异步确认
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("消息的主题:" + metadata.topic());
    System.out.println("消息的分区号:" + metadata.partition());
    System.out.println("消息的偏移量:" + metadata.offset());
    } else {
    System.out.println("异常消息:" + exception.getMessage());
    }
    }
    });

    // 关闭生产者
    producer.close();
    }
    }
    1. 消费者,创建MyConsumer1类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;

    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;

    public class MyConsumer1 {
    public static void main(String[] args) {

    Map<String, Object> configs = new HashMap<>();
    // node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.92.121:9092");
    // 使用常量代替手写的字符串,配置key的反序列化器
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    // 配置value的反序列化器
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 配置消费组ID
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo1");
    // 如果找不到当前消费者的有效偏移量,则自动重置到最开始
    // latest表示直接重置到消息偏移量的最后一个
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

    // 先订阅,再消费
    consumer.subscribe(Arrays.asList("topic_1"));

    // while (true) {
    // final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
    // }
    // 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
    // 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。

    // 批量从主题的分区拉取消息
    final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);

    // 遍历本次从主题的分区拉取的批量消息
    consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
    @Override
    public void accept(ConsumerRecord<Integer, String> record) {
    System.out.println(record.topic() + "\t"
    + record.partition() + "\t"
    + record.offset() + "\t"
    + record.key() + "\t"
    + record.value());
    }
    });

    consumer.close();

    }
    }
    1. 改造以上生产者和消费者,实现循环生产和消费。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.internals.RecordHeader;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;

    public class MyProducer2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

    Map<String, Object> configs = new HashMap<>();
    // 指定初始连接用到的broker地址
    configs.put("bootstrap.servers", "192.168.100.101:9092");
    // 指定key的序列化类
    configs.put("key.serializer", IntegerSerializer.class);
    // 指定value的序列化类
    configs.put("value.serializer", StringSerializer.class);

    // configs.put("acks", "all");
    // configs.put("reties", "3");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

    // 用于设置用户自定义的消息头字段
    List<Header> headers = new ArrayList<>();
    headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

    for (int i = 0; i < 100; i++) {

    ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
    "topic_1",
    0,
    i,
    "hello lagou " + i,
    headers
    );

    // 消息的同步确认
    // final Future<RecordMetadata> future = producer.send(record);
    // final RecordMetadata metadata = future.get();
    // System.out.println("消息的主题:" + metadata.topic());
    // System.out.println("消息的分区号:" + metadata.partition());
    // System.out.println("消息的偏移量:" + metadata.offset());

    // 消息的异步确认
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("消息的主题:" + metadata.topic());
    System.out.println("消息的分区号:" + metadata.partition());
    System.out.println("消息的偏移量:" + metadata.offset());
    } else {
    System.out.println("异常消息:" + exception.getMessage());
    }
    }
    });
    }

    // 关闭生产者
    producer.close();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;

    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;

    public class MyConsumer2 {
    public static void main(String[] args) {

    Map<String, Object> configs = new HashMap<>();
    // node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    // 使用常量代替手写的字符串,配置key的反序列化器
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    // 配置value的反序列化器
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 配置消费组ID
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo2");
    // 如果找不到当前消费者的有效偏移量,则自动重置到最开始
    // latest表示直接重置到消息偏移量的最后一个
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

    // 先订阅,再消费
    consumer.subscribe(Arrays.asList("topic_1"));

    while (true) {
    // 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
    // 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。

    // 批量从主题的分区拉取消息
    final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);

    // 遍历本次从主题的分区拉取的批量消息
    consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
    @Override
    public void accept(ConsumerRecord<Integer, String> record) {
    System.out.println(record.topic() + "\t"
    + record.partition() + "\t"
    + record.offset() + "\t"
    + record.key() + "\t"
    + record.value());
    }
    });
    }

    // consumer.close();

    }
    }

SpringBoot和Kafka整合

    1. 创建springboot项目,添加 Spring Web 和 Spring for Apache Kafka 依赖项,pom文件如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.1</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lagou.kafka.demo</groupId>
    <artifactId>demo-02-springboot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-02-springboot-kafka</name>
    <description>Demo project for Spring Boot</description>
    <properties>
    <java.version>1.8</java.version>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>

    </project>
    1. 编辑application.properties文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    spring.application.name=springboot-kafka-02
    server.port=8080

    # kafka的配置
    spring.kafka.bootstrap-servers=192.168.92.121:9092

    #producer配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 生产者每个批次最多放多少条记录
    spring.kafka.producer.batch-size=16384
    # 生产者一端总的可用发送缓冲区大小,此处设置为32MB
    spring.kafka.producer.buffer-memory=33554432

    #consumer配置
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.group-id=springboot-consumer02
    # 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
    spring.kafka.consumer.auto-offset-reset=earliest
    # 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
    spring.kafka.consumer.enable-auto-commit=true
    # 消费者偏移量自动提交的时间间隔
    spring.kafka.consumer.auto-commit-interval=1000
    1. 创建同步发送消息的KafkaSyncProducerController
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package com.lagou.kafka.demo.controller;

    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    import java.util.concurrent.ExecutionException;

    @RestController
    public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @RequestMapping("send/sync/{message}")
    public String send(@PathVariable String message) {

    final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring-01", 0, 0, message);
    // 同步发送消息
    try {
    final SendResult<Integer, String> sendResult = future.get();
    final RecordMetadata metadata = sendResult.getRecordMetadata();

    System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());

    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }

    return "success";
    }

    }
    1. 创建异步发送消息的KafkaAsyncProducerController
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package com.lagou.kafka.demo.controller;

    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    @RestController
    public class KafkaAsyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;


    @RequestMapping("send/async/{message}")
    public String send(@PathVariable String message) {

    final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring-01", 0, 1, message);

    // 设置回调函数,异步等待broker端的返回结果
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
    @Override
    public void onFailure(Throwable throwable) {
    System.out.println("发送消息失败:" + throwable.getMessage());
    }

    @Override
    public void onSuccess(SendResult<Integer, String> result) {
    final RecordMetadata metadata = result.getRecordMetadata();

    System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
    }
    });

    return "success";
    }

    }
    1. 创建监听器MyConsumer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
      package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;

    @Component
    public class MyConsumer {

    @KafkaListener(topics = "topic-spring-01")
    public void onMessage(ConsumerRecord<Integer, String> record) {
    System.out.println("消费者收到的消息:"
    + record.topic() + "\t"
    + record.partition() + "\t"
    + record.offset() + "\t"
    + record.key() + "\t"
    + record.value());
    }

    }
    ```

    - 6. 对KafkaAutoConfiguration进行修改,可以自定义其中配置

    ```JAVA
    package com.lagou.kafka.demo.config;

    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.KafkaAdmin;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;
    import java.util.Map;

    @Configuration
    public class KafkaConfig {

    //启动配置新建主题
    @Bean
    public NewTopic topic1() {
    return new NewTopic("nptc-01", 3, (short) 1);
    }

    //启动配置新建主题
    @Bean
    public NewTopic topic2() {
    return new NewTopic("nptc-02", 5, (short) 1);
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("bootstrap.servers", "node1:9092");
    KafkaAdmin admin = new KafkaAdmin(configs);
    return admin;
    }

    @Bean
    @Autowired
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {

    // 覆盖ProducerFactory原有设置
    Map<String, Object> configsOverride = new HashMap<>();
    configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);

    KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(
    producerFactory, configsOverride
    );
    return template;
    }

    }