0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Spring Kafka的各种用法

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-09-25 17:04 次阅读

最近业务上用到了Spring Kafka,所以系统性的探索了下Spring Kafka的各种用法,发现了很多实用的特性,下面介绍下Spring Kafka的消息重试机制。

0. 前言

原生 Kafka 是不支持消息重试的。但是 Spring Kafka 2.7+ 封装了 Retry Topic 这个功能。

1. @RetryableTopic

使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可:

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic
    @KafkaListener(topics = "topic1", groupId = "group1")
    public void onMsg(ConsumerRecord< String, String > record) {
        log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

这样就开启了 Spring Kafka 的消息重试机制:默认重试 3 次,间隔为 1 秒。

我们在方法里模拟了抛出异常,运行后可以发现打印了 3 条日志,间隔时间大约为 1 秒,重试的topic为原topic加上后缀“-retry”

2022-11-12 12:14:10.230  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1
2022-11-12 12:14:11.315  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0  
2022-11-12 12:14:12.310  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1

2. DLT死信队列

如果 3 次重试后依旧失败,会将消息发送到 DLT,

默认情况,消息被发送到死信队列后,会输出一条日志。

2022-11-12 12:14:13.324  INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: topic1-dlt@233

DLT的topic为原topic加上后缀“-dlt”

我们可以使用@DltHandler注解来定义进入死信队列后的操作:

@DltHandler
public void dltHandler(ConsumerRecord< String, String > record) {
    log.info("topic:{}, key:{}, value:{}", record.topic(), record.key(), record.value());
}

3. 自定义@RetryableTopic

可以自定义重试次数、延迟时间、topic命名策略等等,支持使用 Spring EL 表达式读取配置。

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = "5000", multiplier = "2"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    @KafkaListener(topics = "topic2", groupId = "group1")
    public void onMsg2(ConsumerRecord< String, String > record) {
        log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

注解属性说明:

attempts :重试次数,默认为3。

@Backoff delay :消费延迟时间,单位为毫秒。

@Backoff multiplier :延迟时间系数,此例中 attempts = 4, delay = 5000, multiplier = 2 ,则间隔时间依次为5s、10s、20s、40s,最大延迟时间受 maxDelay 限制。

fixedDelayTopicStrategy :可选策略包括:SINGLE_TOPIC 、MULTIPLE_TOPICS

4. 配置类

以上介绍的是注解的方式,只对注解下的方法有效。如果想让多个方法都用相同的消息重试配置,那么可以使用配置类方式:

@Bean
public RetryTopicConfiguration retryTopic(KafkaTemplate< String, String > template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .maxAttempts(4)
            .fixedBackOff(5000)
            .includeTopic("topic1")
            .create(template);
}

小结

以上就是Spring Kafka消息重试机制的简单应用~希望能够帮助那些正在使用Spring Kafka或即将使用的人少走一些弯路、少踩一点坑。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • spring
    +关注

    关注

    0

    文章

    338

    浏览量

    14307
  • 日志
    +关注

    关注

    0

    文章

    138

    浏览量

    10631
  • 机制
    +关注

    关注

    0

    文章

    24

    浏览量

    9774
  • DLT
    DLT
    +关注

    关注

    0

    文章

    16

    浏览量

    5293
收藏 人收藏

    评论

    相关推荐

    Spring Boot Starter需要些什么

    pulsar-spring-boot-starter是非常有必要的,在此之前,我们先看看一个starter需要些什么。 Spring Boot Starter spring-boot的强大之处在于其提供的大量
    的头像 发表于 09-25 11:35 727次阅读
    <b class='flag-5'>Spring</b> Boot Starter需要些什么

    java spring教程

    java spring教程理解Spring 实现原理掌握Spring IOC,AOP掌握Spring的基础配置和用法熟练使用SSH开发项目
    发表于 09-11 11:09

    什么是java spring

    、并且更易于测试的代码。它们也为Spring中的各种模块提供了基础支持。Spring带给我们什么◆方便解耦,简化开发      
    发表于 09-11 11:16

    基于发布与订阅的消息系统Kafka

    Kafka权威指南》——初识 Kafka
    发表于 03-05 13:46

    Kafka基础入门文档

    kafka系统入门教程(原理、配置、集群搭建、Java应用、Kafka-manager)
    发表于 03-12 07:22

    Spring笔记分享

    Spring实现了使用简单的组件配置组合成一个复杂的应用。在 Spring 中可以使用XML和Java注解组合这些对象。6) 一站式:在IOC和AOP的基础上可以整合各种企业应用的开源框架和优秀的第三方类
    发表于 11-04 07:51

    Kafka集群环境的搭建

    1、环境版本版本:kafka2.11,zookeeper3.4注意:这里zookeeper3.4也是基于集群模式部署。2、解压重命名tar -zxvf
    发表于 01-05 17:55

    spring定时器用法详解

    Spring是于2003年兴起的一个轻量级的Java开发框架,由RodJohnson创建。简单来说,Spring是一个分层的JavaSE/EEfull-stack(一站式)轻量级开源框架。下文为大家介绍spring定时器
    发表于 01-28 10:16 5685次阅读
    <b class='flag-5'>spring</b>定时器<b class='flag-5'>用法</b>详解

    Kafka的概念及Kafka的宕机

    问题要从一次Kafka的宕机开始说起。 笔者所在的是一家金融科技公司,但公司内部并没有采用在金融支付领域更为流行的 RabbitMQ ,而是采用了设计之初就为日志处理而生的 Kafka ,所以我一直
    的头像 发表于 08-27 11:21 2050次阅读
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕机

    Spring Boot实现各种参数校验

    之前也写过一篇关于Spring Validation使用的文章,不过自我感觉还是浮于表面,本次打算彻底搞懂Spring Validation。本文会详细介绍Spring Validation
    的头像 发表于 08-14 15:54 930次阅读

    Spring Validation的使用

    之前也写过一篇关于Spring Validation使用的文章,不过自我感觉还是浮于表面,本次打算彻底搞懂Spring Validation。本文会详细介绍Spring Validation
    的头像 发表于 09-08 10:31 867次阅读

    Kafka 的简介

      1 kafka简介 2 为什么要用消息系统 3 kafka基础知识 4 kafka集群架构 5 总结   1 kafka简介 其主要设计目标如下: 以时间复杂度为O(1)的方式提供
    的头像 发表于 07-03 11:10 587次阅读
    <b class='flag-5'>Kafka</b> 的简介

    监控Kafka集群的常用的方法和工具介绍

    Control等工具连接到Kafka Broker的JMX端口,并监控各种关键指标,如吞吐量、延迟、磁盘使用率、网络连接数等。
    发表于 08-30 10:05 2048次阅读
    监控<b class='flag-5'>Kafka</b>集群的常用的方法和工具介绍

    kafka client在 spring如何实现

    认识了 spring-boot-starter ,今天不妨来看下如何写一个 pulsar-spring-boot-starter 模块。 目标 写一个完整的类似 kafka-spring-boot-st
    的头像 发表于 09-25 11:21 462次阅读
    <b class='flag-5'>kafka</b> client在 <b class='flag-5'>spring</b>如何实现

    Kafka架构技术:Kafka的架构和客户端API设计

    Kafka 给自己的定位是事件流平台(event stream platform)。因此在消息队列中经常使用的 "消息"一词,在 Kafka 中被称为 "事件"。
    的头像 发表于 10-10 15:41 2302次阅读
    <b class='flag-5'>Kafka</b>架构技术:<b class='flag-5'>Kafka</b>的架构和客户端API设计