0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

kafka client在 spring如何实现

科技绿洲 来源:了不起 作者:了不起 2023-09-25 11:21 次阅读

之前写过关于 Apache Pulsar 的简单示例,用来了解如何使用 Pulsar 这个新生代的消息队列中间件,但是如果想要在项目中使用,还会欠缺很多,最明显的就是 集成复杂,如果你用过其他消息中间件,比如 Kafka、RabbitMq,只需要简单的引入 jar,就可以通过注解+配置快速集成到项目中。

开始一个 Pulsar Starter

既然已经了解了 Apache Pulsar,又认识了 spring-boot-starter,今天不妨来看下如何写一个 pulsar-spring-boot-starter 模块。

目标

写一个完整的类似 kafka-spring-boot-starter(springboot 项目已经集成到 spring-boot-starter 中),需要考虑到很多 kafka 的特性, 今天我们主要实现下面几个模板

  • 在项目中够通过引入 jar 依赖快速集成
  • 提供统一的配置入口
  • 能够快速发送消息
  • 能够基于注解实现消息的消费

定义结构

└── pulsar-starter
    ├── pulsar-spring-boot-starter
    ├── pulsar-spring-boot-autoconfigure
    ├── spring-pulsar
    ├── spring-pulsar-xx
    ├── spring-pulsar-sample
└── README.md

整个模块的结构如上其中pulsar-starter作为一个根模块,主要控制子模块依赖的其他 jar 的版本以及使用到的插件版本。类似于 Spring-Bom,这样我们在后续升级 时,就可以解决各个第三方 jar 的可能存在版本冲突导致的问题。

  • pulsar-spring-boot-starter

该模块作为外部项目集成的直接引用 jar,可以认为是 pulsar-spring-boot-starter 组件的入口,里面不需要写任何代码,只需要引入需要的依赖(也就是下面的子模块)即可

  • pulsar-spring-boot-autoconfigure

该模块主要定义了 spring.factories 以及 AutoConfigure、Properties。也就是自动配置的核心(配置项+Bean 配置)

  • spring-pulsar

该模块是核心模块,主要的实现都在这里

  • spring-pulsar-xx

扩展模块,可以对 spring-pulsar 做更细化的划分

  • spring-pulsar-sample

starter 的使用示例项目

实现

上面我们说到实现目标,现在看下各个模块应该包含什么内容,以及怎么实现我们的目标

  • 入口 pulsar-spring-boot-starter

上面说到 starter 主要是引入整个模块基础的依赖即可,里面不用写代码。

< dependencies >
    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >spring-pulsar< /artifactId >
        < version >${project.version}< /version >
    < /dependency >

    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >pulsar-spring-boot-autoconfigure< /artifactId >
        < version >${project.version}< /version >
    < /dependency >
< /dependencies >
  • pulsar-spring-boot-autoconfigure
  1. 添加 spring-boot 基础的配置
< dependencies >
     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-starter-logging< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-configuration-processor< /artifactId >
         < optional >true< /optional >
     < /dependency >
< /dependencies >
  1. 定义自动配置类 PulsarAutoConfiguration
    • 引入 Properties ,基于EnableConfigurationPropertiesspring-boot-configuration-processor解析 Properties 生成对应spring-configuration-metadata.json文件,这样编写 application.yml 配置时就可以自动提示配置项的属性和值了。
    • 构建一些必须的 Bean,如 PulsarClient、ConsumerFactory、ConsumerFactory 等
    • Import 配置 PulsarAnnotationDrivenConfiguration,这个主要是一些额外的配置,用来支持后面的功能
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
@Import({PulsarAnnotationDrivenConfiguration.class})
public class PulsarAutoConfiguration {

    private final PulsarProperties properties;

    public PulsarAutoConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() {
        ClientBuilder clientBuilder = new ClientBuilderImpl(properties);
        return clientBuilder.build();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory pulsarConsumerFactory() {
        return new DefaultPulsarConsumerFactory(pulsarClient(), properties.getConsumer().buildProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory pulsarProducerFactory() {
        return new DefaultPulsarProducerFactory(pulsarClient(), properties.getProducer().buildProperties());
    }

}
  1. 配置 spring.factory

在目录src/main/resources/META-INF下创建 spring.factories ,内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.sucl.pulsar.autoconfigure.PulsarAutoConfiguration
  • spring-pulsar
  1. 添加 pulsar-client 相关的依赖
< dependencies >
     < dependency >
         < groupId >org.apache.pulsar< /groupId >
         < artifactId >pulsar-client< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-autoconfigure< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework< /groupId >
         < artifactId >spring-messaging< /artifactId >
     < /dependency >
< /dependencies >
  1. 定义 EnablePulsar,之前说到过,@Enable 注解主要是配合 AutoConfigure 来做功能加强,没有了自动配置,我们依然可以使用这些模块的功能。这里做了一件事,向 Spring 容器注册了两个 Bean
  • PulsarListenerAnnotationBeanProcessor 在 Spring Bean 生命周期中解析注解自定义注解 PulsarListener、PulsarHandler,
  • PulsarListenerEndpointRegistry 用来构建 Consumer 执行环境以及对 TOPIC 的监听、触发消费回调等等,可以说是最核心的 Bean
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({PulsarListenerConfigurationSelector.class})
public @interface EnablePulsar {

}
  1. 定义注解,参考 RabbitMq,主要针对需要关注的类与方法,分别对应注解@PulsarListener、@PulsarHandler,通过这两个注解配合可以让我们监听到关注的 TOPIC, 当有消息产生时,触发对应的方法进行消费。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarListener {

    /**
     *
     * @return TOPIC 支持SPEL
     */
    String[] topics() default {};

    /**
     *
     * @return TAGS 支持SPEL
     */
    String[] tags() default {};
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarHandler {

}
  1. 注解@PulsarListener 的处理流程比较复杂,这里用一张图描述,或者可以通过下面 github 的源代码查看具体实现

图片

flow

  • spring-pulsar-sample

按照下面的流程,你会发现通过简单的几行代码就能够实现消息的生产与消费,并集成到项目中去。

  1. 简单写一个 SpringBoot 项目,并添加 pulsar-spring-boot-starter
< dependencies >
    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >pulsar-spring-boot-starter< /artifactId >
        < version >${project.version}< /version >
    < /dependency >

    < dependency >
        < groupId >org.springframework.boot< /groupId >
        < artifactId >spring-boot-starter-web< /artifactId >
    < /dependency >
< /dependencies >
  1. 添加配置
cycads:
  pulsar:
    service-url: pulsar://localhost:6650
  listener-topics: TOPIC_TEST
  1. 编写对应消费代码
@Slf4j
@Component
@PulsarListener(topics = "#{'${cycads.listener-topics}'.split(',')}")
public class PulsarDemoListener {

    @PulsarHandler
    public void onConsumer(Message message){
        log.info(" >> > 接收到消息:{}", message.getPayload());
    }

}
  1. 向 Pulsar Broker 发送消息进行测试
@Slf4j
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ContextConfig.class})
@Import({PulsarAutoConfiguration.class})
public class ProducerTests {

    @Autowired
    private ProducerFactory producerFactory;

    @Test
    public void sendMessage() {
        Producer producer = producerFactory.createProducer("TOPIC_TEST");
        MessageId messageId = producer.send("this is a test message");
        log.info(" >> >> >> > 消息发送完成:{}", messageId);
    }

    @Configuration
    @PropertySource(value = "classpath:application-test.properties")
    static class ContextConfig {
        //
    }
}
  1. 控制台可以看到这样的结果
2023-02-26 19:57:15.572  INFO 26520 --- [pulsar-01] c.s.p.s.listener.PulsarDemoListener : > >> 接收到消息:GenericMessage [payload=this is a test message, headers={id=f861488c-2afb-b2e7-21a1-f15e9759eec5, timestamp=1677412635571}]

知识点

  • Pulsar Client

基于 pulsar-client 提供的 ConfigurationData 扩展 Properties;了解 Pulsar Client 如何连接 Broker 并进行消息消费,包括同步消费、异步消费等等

  • spring.factories

实现 starter 自动配置的关键,基于 SPI 完成配置的自动加载

  • Spring Bean 生命周期

通过 Bean 生命周期相关扩展实现注解的解析与容器的启动,比如 BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton, InitializingBean, DisposableBean 等

  • Spring Messaging

基于回调与 MethodHandler 实现消息体的封装、参数解析以及方法调用;

源码示例

https://github.com/sucls/pulsar-starter.git

结束语

如果你看过 spring-kafka 的源代码,那么你会发现所有代码基本都是仿造其实现。一方面能够阅读 kafka client 在 spring 具体如何实现;同时通过编写自己的 spring starter 模块,学习 整个 starter 的实现过程。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 模块
    +关注

    关注

    7

    文章

    2670

    浏览量

    47340
  • 模板
    +关注

    关注

    0

    文章

    108

    浏览量

    20554
  • 代码
    +关注

    关注

    30

    文章

    4744

    浏览量

    68344
  • spring
    +关注

    关注

    0

    文章

    338

    浏览量

    14308
  • kafka
    +关注

    关注

    0

    文章

    50

    浏览量

    5211
收藏 人收藏

    评论

    相关推荐

    Spring Boot如何实现异步任务

    Spring Boot 提供了多种方式来实现异步任务,这里介绍三种主要实现方式。 1、基于注解 @Async @Async 注解是 Spring 提供的一种轻量级异步方法
    的头像 发表于 09-30 10:32 1406次阅读

    Spring Boot Starter需要些什么

    前面我们简单介绍了如何使用消息中间件 Apache Pulsar ,但是项目中那样使用,显然是不太好的,不管从易用性和扩展性来看,都是远远不够, 为了和springboot项目集成,写一个
    的头像 发表于 09-25 11:35 727次阅读
    <b class='flag-5'>Spring</b> Boot Starter需要些什么

    Spring状态机的实现原理和使用方法

    说起 Spring 状态机,大家很容易联想到这个状态机和设计模式中状态模式的区别是啥呢?没错,Spring 状态机就是状态模式的一种实现介绍 S
    的头像 发表于 12-26 09:39 1869次阅读
    <b class='flag-5'>Spring</b>状态机的<b class='flag-5'>实现</b>原理和使用方法

    java spring教程

    java spring教程理解Spring 实现原理掌握Spring IOC,AOP掌握Spring的基础配置和用法熟练使用SSH开发项目
    发表于 09-11 11:09

    什么是java spring

    或多个模块联合实现简单来说,Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架。■ 轻量——从大小与开销两方面而言Spring都是轻量的。完整的Spring框架
    发表于 09-11 11:16

    Spring笔记分享

    Spring实现了使用简单的组件配置组合成一个复杂的应用。 Spring 中可以使用XML和Java注解组合这些对象。6) 一站式:I
    发表于 11-04 07:51

    Kafka集群环境的搭建

    1、环境版本版本:kafka2.11,zookeeper3.4注意:这里zookeeper3.4也是基于集群模式部署。2、解压重命名tar -zxvf
    发表于 01-05 17:55

    Spring认证」什么是Spring GraphQL?

    这个项目建立 Boot 2.x 上,但它应该与最新的 Boot2.4.x5 相关。 要创建项目,请转到start.spring.io并为要使用的GraphQL传输选择启动器: 启动机 运输 执行
    的头像 发表于 08-10 14:08 797次阅读
    「<b class='flag-5'>Spring</b>认证」什么是<b class='flag-5'>Spring</b> GraphQL?

    Kafka的概念及Kafka的宕机

    很好奇Kafka的高可用实现和保障。从 Kafka 部署后,系统内部使用的 Kafka 一直运行稳定,没有出现不可用的情况。 但最近系统测试人员常反馈偶有
    的头像 发表于 08-27 11:21 2056次阅读
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕机

    Spring Boot实现各种参数校验

    之前也写过一篇关于Spring Validation使用的文章,不过自我感觉还是浮于表面,本次打算彻底搞懂Spring Validation。本文会详细介绍Spring Validation各种场景下的最佳实践及其
    的头像 发表于 08-14 15:54 931次阅读

    Kafka 的简介

    ,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率。即使非常廉价的机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个
    的头像 发表于 07-03 11:10 589次阅读
    <b class='flag-5'>Kafka</b> 的简介

    物通博联5G-kafka工业网关实现kafka协议对接到云平台

    Kafka协议是一种基于TCP层的网络协议,用于分布式消息传递系统Apache Kafka中发送和接收消息。Kafka协议定义了客户端和服务器之间的通信方式和数据格式,允许客户端发送
    的头像 发表于 07-11 10:44 482次阅读

    Spring Kafka的各种用法

    Kafka 是不支持消息重试的。但是 Spring Kafka 2.7+ 封装了 Retry Topic 这个功能。 1. @RetryableTopic 使用注解的方式启用 Retry Topic,
    的头像 发表于 09-25 17:04 935次阅读

    Kafka架构技术:Kafka的架构和客户端API设计

    Kafka 给自己的定位是事件流平台(event stream platform)。因此消息队列中经常使用的 "消息"一词, Kafka 中被称为 "事件"。
    的头像 发表于 10-10 15:41 2305次阅读
    <b class='flag-5'>Kafka</b>架构技术:<b class='flag-5'>Kafka</b>的架构和客户端API设计

    如何将Kafka使用到我们的后端设计中

    本文介绍了以下内容: 1.什么是Kafka? 2.为什么我们需要使用Kafka这样的消息系统及使用它的好处 3.如何将Kafka使用到我们的后端设计中。 译自timber.io
    的头像 发表于 10-30 14:30 494次阅读
    如何将<b class='flag-5'>Kafka</b>使用到我们的后端设计中