0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Spring Boot整合分布式消息平台Pulsar介绍

jf_ro2CN3Fa 来源:君哥聊技术 2023-07-12 09:58 次阅读

作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。

部署 Pulsar

Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU4G 内存。

部署命令如下:

dockerrun-it-p6650:6650-p8080:8080--mountsource=pulsardata,target=/pulsar/data--mountsource=pulsarconf,target=/pulsar/confapachepulsar/pulsar:2.9.1bin/pulsarstandalone

安装过程可能会出现下面的错误:

unknownflag:--mount
See'dockerrun--help'.

这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 17.06 以上就可以了。

部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了。

2022-01-08T22:27:58,726+0000[main]INFOorg.apache.pulsar.broker.PulsarService-messagingserviceisready,bootstrapserviceport=8080,brokerurl=pulsar://localhost:6650,cluster=standalone

本地单节点集群启动后,会创建一个 namespace,名字叫 public/default

Pulsar 客户端

目前 Pulsar 支持多种语言的客户端,包括:

Java 客户端

Go 客户端

Python 客户端

C++ 客户端

Node.js 客户端

WebSocket 客户端

C# 客户端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:


org.apache.pulsar
pulsar-client
2.9.1

然后在 properties 文件中添加配置:

# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

创建 Client

创建客户端非常简单,代码如下:

client=PulsarClient.builder()
.serviceUrl(url)
.build();

上面的 url 就是 properties 文件中定义的 pulsar.url 。

创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群。

创建 Producer

producer=client.newProducer()
.topic(topic)
.compressionType(CompressionType.LZ4)
.sendTimeout(0,TimeUnit.SECONDS)
.enableBatching(true)
.batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.maxPendingMessages(1000)
.blockIfQueueFull(true)
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();

创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误。

下面解释一下创建 Producer 的参数:

topic :Producer 要写入的 topic。

compressionType :压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 2.3 以上,这个策略才会生效。

sendTimeout :超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送。

enableBatching :是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效。

batchingMaxPublishDelay :批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率。

batchingMaxMessages :批量发送消息的最大数量。

maxPendingMessages :等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true。

blockIfQueueFull :Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送。

roundRobinRouterBatchingPartition-SwitchFrequency :如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。

创建 Consumer

Pulsar 的消费模型如下图:

f5c11b2e-2053-11ee-962d-dac502259ad0.png图片

从图中可以看到,Consumer 要绑定一个 subscription 才能进行消费。

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(60,TimeUnit.SECONDS)
.receiverQueueSize(1000)
.subscribe();

下面解释一下创建 Consumer 的参数:

topic :Consumer 要订阅的 topic。

subscriptionNam e:consumer 要关联的 subscription 名字。

subscriptionType :订阅类型,Pulsar 支持四种类型订阅:

Exclusive :独占模式,同一个 Topic 只能有一个消费者,如果多个消费者,就会出错。

Failover :灾备模式,同一个 Topic 可以有多个消费者,但是只能有一个消费者消费,其他消费者作为故障转移备用,如果当前消费者出了故障,就从备用消费者中选择一个进行消费。如下图:

f5de2fde-2053-11ee-962d-dac502259ad0.png图片

Shared :共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者。如下图:

f6008dcc-2053-11ee-962d-dac502259ad0.png

Key_Shared :消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。如下图:

f6357d02-2053-11ee-962d-dac502259ad0.png

subscriptionInitialPosition :创建新的 subscription 时从哪里开始消费,有两个选项:

Latest :从最新的消息开始消费

Earliest :从最早的消息开始消费

negativeAckRedeliveryDelay :消费失败后间隔多久 broker 重新发送。

receiverQueueSize :在调用 receive 方法之前,最多能累积多少条消息。可以设置为 0,这样每次只从 broker 拉取一条消息。在 Shared 模式下,receiverQueueSize 设置为 0,可以防止批量消息多发给一个 Consumer 而导致其他 Consumer 空闲。

Consumer 接收消息有四种方式:同步单条、同步批量、异步单条和异步批量,代码如下:

Messagemessage=consumer.receive()
CompletableFuturemessage=consumer.receiveAsync();
Messagesmessage=consumer.batchReceive();
CompletableFuturemessage=consumer.batchReceiveAsync();

对于批量接收,也可以设置批量接收的策略,代码如下:

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024*1024)
.timeout(200,TimeUnit.MILLISECONDS)
.build())
.subscribe();

代码中的参数说明如下:

maxNumMessages :批量接收的最大消息数量。

maxNumBytes :批量接收消息的大小,这里是 1MB。

测试

首先编写 Producer 发送消息的代码,如下:

publicvoidsendMsg(Stringkey,Stringdata){
CompletableFuturefuture=producer.newMessage()
.key(key)
.value(data.getBytes()).sendAsync();
future.handle((v,ex)->{
if(ex==null){
logger.info("发送消息成功,key:{},msg:{}",key,data);
}else{
logger.error("发送消息失败,key:{},msg:{}",key,data);
}
returnnull;
});
future.join();
logger.info("发送消息完成,key:{},msg:{}",key,data);
}

然后编写一个 Consumer 消费消息的代码,如下:

publicvoidstart()throwsException{
while(true){
Messagemessage=consumer.receive();
Stringkey=message.getKey();
Stringdata=newString(message.getData());
Stringtopic=message.getTopicName();
if(StringUtils.isNotEmpty(data)){
try{
logger.info("收到消息,topic:{},key:{},data:{}",topic,key,data);
}catch(Exceptione){
logger.error("接收消息异常,topic:{},key:{},data:{}",topic,key,data,e);
}
}
consumer.acknowledge(message);
}
}

最后编写一个 Controller 类,调用 Producer 发送消息,代码如下:

@RequestMapping("/send")
@ResponseBody
publicStringsend(@RequestParamStringkey,@RequestParamStringdata){
logger.info("收到消息发送请求,key:{},value:{}",key,data);
pulsarProducer.sendMsg(key,data);
return"success";
}

调用 Producer 发送一条消息,key=key1,data=data1,具体操作为在浏览器中输入下面的 url 后回车

可以看到控制台输出下面日志:

2022-01-0822:42:33,199[pulsar-client-io-6-1][INFO]boot.pulsar.PulsarProducer-发送消息成功,key:key1,msg:data1
2022-01-0822:42:33,200[http-nio-8083-exec-1][INFO]boot.pulsar.PulsarProducer-发送消息完成,key:key1,msg:data1
2022-01-0822:42:33,232[Thread-22][INFO]boot.pulsar.PulsarConsumer-收到消息,topic//public/default/testTopic,key:key1,data:data1
2022-01-0822:43:14,498[pulsar-timer-5-1][INFO]org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl-[testTopic][topicGroup][7def6]Prefetchedmessages:0---Consumethroughputreceived:0.02msgs/s---0.00Mbit/s---Acksentrate:0.02ack/s---Failedmessages:0---batchmessages:0---Failedacks:0
2022-01-0822:43:14,961[pulsar-timer-9-1][INFO]org.apache.pulsar.client.impl.ProducerStatsRecorderImpl-[testTopic][standalone-9-0]Pendingmessages:0---Publishthroughput:0.02msg/s---0.00Mbit/s---Latency:med:69.000ms-95pct:69.000ms-99pct:69.000ms-99.9pct:69.000ms-max:69.000ms---Ackreceivedrate:0.02ack/s---Failedmessages:0

从日志中看到,这里使用的 namespace 就是创建集群时生成的public/default。

总结

从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 是非常友好的,使用起来方便简洁。Consumer 的使用需要考虑多一些,需要考虑到批量、异步以及订阅类型。






审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • ACK
    ACK
    +关注

    关注

    0

    文章

    28

    浏览量

    11144
  • URL
    URL
    +关注

    关注

    0

    文章

    139

    浏览量

    15326
  • python
    +关注

    关注

    56

    文章

    4792

    浏览量

    84613

原文标题:Spring Boot 整合分布式消息平台 Pulsar

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Spring Boot Starter需要些什么

    pulsar-spring-boot-starter是非常有必要的,在此之前,我们先看看一个starter需要些什么。 Spring Boot Starter spring-boot
    的头像 发表于 09-25 11:35 754次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> Starter需要些什么

    HarmonyOS应用开发-分布式设计

    设计理念HarmonyOS 是面向未来全场景智慧生活方式的分布式操作系统。对消费者而言,HarmonyOS 将生活场景中的各类终端进行能力整合,形成“One Super Device”,以实现
    发表于 09-22 17:11

    启动Spring Boot项目应用的三种方法

    ,从而使开发人员不再需要定义样板化的配置。用我的话来理解,就是spring boot其实不是什么新的框架,它默认配置了很多框架的使用方式,就像maven整合了所有的jar包,spring
    发表于 01-14 17:33

    如何高效完成HarmonyOS分布式应用测试?

    及云测平台接入Portal共5项测试服务,详见图2。针对分布式应用测试面临的挑战,我们接下来将重点介绍分布式UI测试框架和评分工具。(1)分布式
    发表于 12-13 18:07

    Spring Boot嵌入Web容器原理是什么

    Spring Boot嵌入Web容器原理Spring Boot的目标是构建“非常容易创建、独立、产品级别的基于
    发表于 12-16 07:57

    Spring Boot Web相关的基础知识

    上一篇文章我们已经学会了如何通过IDEA快速建立一个Spring Boot项目,还介绍Spring Boot项目的结构,
    的头像 发表于 03-17 15:03 647次阅读

    kafka client在 spring如何实现

    认识了 spring-boot-starter ,今天不妨来看下如何写一个 pulsar-spring-boot-starter 模块。 目标 写一个完整的类似 kafka-spring-boot-st
    的头像 发表于 09-25 11:21 483次阅读
    kafka client在 <b class='flag-5'>spring</b>如何实现

    Spring Boot Actuator快速入门

    不知道大家在写 Spring Boot 项目的过程中,使用过 Spring Boot Actuator 吗?知道 Spring
    的头像 发表于 10-09 17:11 630次阅读

    Spring Boot的启动原理

    可能很多初学者会比较困惑,Spring Boot 是如何做到将应用代码和所有的依赖打包成一个独立的 Jar 包,因为传统的 Java 项目打包成 Jar 包之后,需要通过 -classpath 属性
    的头像 发表于 10-13 11:44 638次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>的启动原理

    Spring Boot 的设计目标

    什么是Spring Boot Spring BootSpring 开源组织下的一个子项目,也是 S
    的头像 发表于 10-13 14:56 574次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> 的设计目标

    spring分布式框架有哪些

    Spring分布式框架是一套基于Spring框架的解决方案,用于构建分布式系统。它提供了一系列的组件和模块,可以帮助开发人员轻松地构建可扩展、高可用、高性能的
    的头像 发表于 11-16 10:58 771次阅读

    springclould分布式教程

    Spring Cloud是一个基于Spring Boot分布式系统开发工具,它提供了一系列的分布式系统解决方案,可以帮助开发者快速构建和部
    的头像 发表于 11-16 10:59 501次阅读

    springcloud如何实现分布式

    Spring Cloud是基于Spring Boot开发的一套分布式系统解决方案,它主要包括了多个子项目,如服务注册与发现、配置中心、负载均衡、断路器、路由等等。通过使用
    的头像 发表于 11-16 11:01 676次阅读

    springcloud 分布式事务解决方案实例

    么都执行成功,要么都执行失败。本文将介绍如何使用Spring Cloud来实现分布式事务。 在分布式系统中,使用数据库事务来保证数据一致性是常见的做法。
    的头像 发表于 12-03 16:32 1135次阅读

    基于分布式对象存储WDS的信托非结构化数据整合平台

    基于分布式对象存储WDS的信托非结构化数据整合平台
    的头像 发表于 08-28 09:56 320次阅读
    基于<b class='flag-5'>分布式</b>对象存储WDS的信托非结构化数据<b class='flag-5'>整合</b><b class='flag-5'>平台</b>