Java 17 下 Spring Boot 与 Pulsar 队列集成实战:生产者与消费者实现指南

Pulsar队列与Springboot集成有2种模式:官方pulsar-client 或社区Starter(如pulsar-spring-boot-starter)

  • 如果考虑最新、最快、最齐全的功能,使用官方pulsar-client
  • 如果考虑快速低成本接入,使用社区Starter(如pulsar-spring-boot-starter)

环境依赖:

  • SpringBoot 3.3.12

  • Java 17

  • 官方pulsar-client

    • 引入依赖
    • 配置Pulsar连接
    • 创建生产者
    • 创建消费者
  • 社区Starter

    • 引入依赖
    • 发送消息
    • 接收消息

在这里插入图片描述
在这里插入图片描述

官方pulsar-client

官方 pulsar-client 提供了最全面的 Pulsar 功能,适合对功能完整性有较高要求的项目。下面我们一步步实现生产者和消费者的功能。

引入依赖

首先,需要在项目中引入 pulsar-client 的依赖,这能帮助我们在 Spring Boot 项目里使用 Pulsar 客户端功能。

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.3.1</version>
</dependency>

配置Pulsar连接

引入依赖后,我们需要对 Pulsar 进行连接配置,指定 Pulsar 服务的地址。可以在配置文件里添加相关配置,同时创建一个配置类来初始化 Pulsar 客户端。

spring:pulsar:service-url: pulsar://127.0.0.1:6650
@Configuration
public class PulsarConfig {@Value("${spring.pulsar.client.service-url}")private String serviceUrl;@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(serviceUrl).operationTimeout(30, java.util.concurrent.TimeUnit.SECONDS).connectionTimeout(10, java.util.concurrent.TimeUnit.SECONDS);// 可以添加认证等其他配置// clientBuilder.authentication(AuthenticationFactory.token("your-token"));return clientBuilder.build();}
}

创建生产者

完成连接配置后,就可以创建 Pulsar 生产者来发送消息了。下面的代码实现了同步和异步发送消息的功能。

@Service
public class PulsarMessageProducer {private static final String TOPIC = "persistent://public/default/messages";@Autowiredprivate PulsarClient pulsarClient;public void sendMessage(String content) throws PulsarClientException {// 创建生产者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("message-producer").create();// 创建消息对象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 发送消息(同步)MessageId messageId = producer.send(message);System.out.println("Message sent successfully. Message ID: " + messageId);// 关闭生产者producer.close();}public CompletableFuture<MessageId> sendMessageAsync(String content) throws PulsarClientException {// 创建生产者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("async-message-producer").create();// 创建消息对象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 异步发送消息CompletableFuture<MessageId> future = producer.sendAsync(message);future.thenAccept(messageId -> {System.out.println("Async message sent successfully. Message ID: " + messageId);try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}}).exceptionally(throwable -> {System.err.println("Failed to send message: " + throwable.getMessage());try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}return null;});return future;}
}

创建消费者

创建完生产者后,还需要创建消费者来接收消息。下面的代码展示了如何启动一个消费者并异步接收消息。

@Service
public class PulsarMessageConsumer implements CommandLineRunner {private static final String TOPIC = "persistent://public/default/messages";private static final String SUBSCRIPTION = "message-subscription";@Autowiredprivate PulsarClient pulsarClient;@Overridepublic void run(String... args) throws Exception {// 启动消费者startConsumer();}public void startConsumer() throws PulsarClientException {// 创建消费者Consumer<Message> consumer = pulsarClient.newConsumer(Schema.JSON(Message.class)).topic(TOPIC).subscriptionName(SUBSCRIPTION).subscriptionType(SubscriptionType.Shared).subscribe();// 异步消费消息new Thread(() -> {while (true) {try {// 等待接收消息,超时时间为10秒Message<Message> msg = consumer.receive(10, TimeUnit.SECONDS);if (msg != null) {try {// 处理消息Message message = msg.getValue();System.out.println("Received message: " + message);// 确认消息已消费consumer.acknowledge(msg);} catch (Exception e) {// 处理消息失败,重新放回队列consumer.negativeAcknowledge(msg);}}} catch (PulsarClientException e) {if (e.getCause() instanceof java.util.concurrent.TimeoutException) {// 超时异常,继续等待System.out.println("No message received within timeout period, waiting again...");} else {e.printStackTrace();}}}}).start();}
}

社区Starter

社区提供的 pulsar-spring-boot-starter 简化了 Pulsar 与 Spring Boot 的集成过程,适合需要快速接入的项目。下面我们来看看如何使用它。

引入依赖

首先,在配置文件中添加 Pulsar 服务的配置信息,这能帮助我们连接到 Pulsar 服务。

# Pulsar 服务
spring:pulsar:client:serviceUrl: pulsar://127.0.0.1:6650

发送消息

完成配置后,就可以使用 PulsarTemplate 来发送消息了。下面的代码实现了同步和异步发送消息的功能。

@Service
public class MyProducer {private final PulsarTemplate<String> pulsarTemplate;public MyProducer(PulsarTemplate<String> pulsarTemplate) {this.pulsarTemplate = pulsarTemplate;}public void sendMessage(String message) {
// 由于 convertAndSend(String, String) 方法未定义,可能需要使用正确的方法
// 假设使用 send 方法来替代,具体根据 PulsarTemplate 的实际方法决定pulsarTemplate.send("my-topic", message);System.out.println("Sent: " + message);}public CompletableFuture<MessageId> sendMessageAsync(String message) {return pulsarTemplate.sendAsync("my-topic", message);}
}

接收消息

发送消息后,还需要创建消费者来接收消息。使用 @PulsarListener 注解可以方便地监听消息。下面的代码展示了如何接收消息。

@Service
public class MyConsumer {@PulsarListener(topics = "my-topic")public void receive(Message<String> message) {System.out.println("Received in Spring Boot: " + message.getValue());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/86837.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/86837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Go语言高级编程》RPC 入门

《Go语言高级编程》RPC 入门 一、什么是 RPC&#xff1f; RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;是分布式系统中不同节点间的通信方式&#xff0c;允许程序像调用本地函数一样调用远程服务的方法。 Go 语言的标准库 net/rpc 提供了基础的…

第N5周:Pytorch文本分类入门

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客 &#x1f356; 原作者&#xff1a;K同学啊 一、前期准备 1.加载数据 import torch import torch.nn as nn import torchvision from torchvision import transforms,datasets import os,PIL,p…

uniappx 安卓app项目本地打包运行,腾讯地图报错:‘鉴权失败,请检查你的key‘

根目录下添加 AndroidManifest.xml 文件&#xff0c; <application><meta-data android:name"TencentMapSDK" android:value"腾讯地图申请的key" /> </application> manifest.json 文件中添加&#xff1a; "app": {"…

【向上教育】结构化面试开口秘籍.pdf

向 上 教 育 XI A N G S H A N G E D U C A T I O N 结构化 面试 开口秘笈 目 录 第一章 自我认知类 ........................................................................................................................... 2 第二章 工作关系处理类 .......…

Webpack 热更新(HMR)原理详解

&#x1f525; Webpack 热更新&#xff08;HMR&#xff09;原理详解 &#x1f4cc; 本文适用于 Vue、React 等使用 Webpack 的项目开发者&#xff0c;适配 Vue CLI / 自定义 Webpack 项目。 &#x1f3af; 一、什么是 HMR&#xff1f; Hot Module Replacement 是 Webpack 提供的…

MySQL索引完全指南

一、索引是什么&#xff1f;为什么这么重要&#xff1f; 索引就像字典的目录 想象一下&#xff0c;你要在一本1000页的字典里找"程序员"这个词&#xff0c;你会怎么做&#xff1f; 没有目录&#xff1a;从第1页开始一页一页翻&#xff0c;可能要翻500页才能找到有…

学习使用dotnet-dump工具分析.net内存转储文件(2)

运行ShenNiusModularity项目&#xff0c;使用createdump工具dump完整的进程内存映射文件&#xff0c;然后运行dotnet-dump analyze命令加载dump文件。   可以先使用dumpheap命令显示有关垃圾回收堆的信息和有关对象的收集统计信息。dumpheap支持多类参数&#xff08;如下所示…

Oracle BIEE 交互示例(一)同一分析内

Oracle BIEE 交互示例(一)同一分析内 1 示例背景2 实践目标3 实操步骤3.1 创建数据集3.1.1 TEST_TABLE3.1.2 保存名字为【01 TEST_TABLE】3.2 创建分析3.2.1 创建列3.2.2 创建视图3.2.2.1 数据透视表3.2.2.2 图形3.2.2.3 表3.3 设置交互4 结果示例1 示例背景 版本:OBIEE 12…

使用API有效率地管理Dynadot域名,出售账户中的域名

关于Dynadot Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮…

Vite 打包原理详解 + Webpack 对比

&#x1f680; Vite 打包原理详解 Webpack 对比 &#x1f44b; 本文适合&#xff1a;Vite 使用者、Vue/React 工程师、希望搞清楚打包流程及与 Webpack 区别的开发者 &#x1f310; 技术背景&#xff1a;Vite 采用 ES Modules 原生浏览器能力驱动开发体验&#xff0c;Webpack…

区块链RWA(Real World Assets)系统开发全栈技术架构与落地实践指南

一、技术架构设计&#xff1a;分层架构与模块协同 1. 核心区块链层 区块链选型策略&#xff1a; 公链&#xff1a;以太坊主网&#xff08;安全性高&#xff0c;DeFi生态完备&#xff09; Polygon CDK&#xff08;Layer2定制化合规链&#xff0c;Gas费低至$0.003&#xff09;…

GBDT:梯度提升决策树——集成学习中的预测利器

核心定位&#xff1a;一种通过串行集成弱学习器&#xff08;决策树&#xff09;、以梯度下降方式逐步逼近目标函数的机器学习算法&#xff0c;在结构化数据预测任务中表现出色。 本文由「大千AI助手」原创发布&#xff0c;专注用真话讲AI&#xff0c;回归技术本质。拒绝神话或妖…

Redis持久化机制深度解析:RDB与AOF全面指南

摘要 本文深入剖析Redis的持久化机制&#xff0c;全面讲解RDB和AOF两种持久化方式的原理、配置与应用场景。通过详细的操作步骤和原理分析&#xff0c;您将掌握如何配置Redis持久化策略&#xff0c;确保数据安全性与性能平衡。文章包含思维导图概览、命令实操演示、核心原理图…

CentOS7升级openssh10.0p2和openssl3.5.0详细操作步骤

背景 近期漏洞扫描时&#xff0c;发现有很多关于openssh的相关高危漏洞&#xff0c;因此需要升级openssh的版本 升级步骤 由于openssh和openssl的版本是需要相匹配的&#xff0c;这次计划将openssh升级至10.0p2版本&#xff0c;将openssl升级至3.5.0版本&#xff0c;都是目前…

fishbot随身系统安装nvidia显卡驱动

小鱼的fishbot是已经配置好的ubuntu22.04,我听说在预先配置系统时需要勾选安装第三方图形化软件&#xff0c;不然直接安装会有进不去图形化界面的风险&#xff0c;若没有勾选&#xff0c;建议使用其他安装方法&#xff0c;比如禁用系统自带的驱动那套安装流程 1.打开设置->关…

学习昇腾开发的第十天--ffmpeg推拉流

1、FFmpeg推流 注意&#xff1a;在推流之前先运行rtsp-simple-server&#xff08;mediamtx&#xff09; ./mediamtx 1.1 UDP推流 ffmpeg -re -i input.mp4 -c copy -f rtsp rtsp://127.0.0.1:8554/stream 1.2 TCP推流 ffmpeg -re -i input.mp4 -c copy -rtsp_transport t…

成为一名月薪 2 万的 web 安全工程师需要掌握哪些技能??

现在 web 安全工程师比较火&#xff0c;岗位比较稀缺&#xff0c;现在除了一些大公司对学历要求严格&#xff0c;其余公司看中的大部分是能力。 有个亲戚的儿子已经工作 2 年了……当初也是因为其他的行业要求比较高&#xff0c;所以才选择的 web 安全方向。 资料免费分享给你…

Pytorch8实现CNN卷积神经网络

CNN卷积神经网络 本章提供一个对CNN卷积网络的快速实现 全连接网络 VS 卷积网络 全连接神经网络之所以不太适合图像识别任务&#xff0c;主要有以下几个方面的问题&#xff1a; 参数数量太多 考虑一个输入10001000像素的图片(一百万像素&#xff0c;现在已经不能算大图了)&…

平地起高楼: 环境搭建

技术选型 本小册是采用纯前端的技术栈模拟实现小程序架构的系列文章&#xff0c;所以主要以前端技术栈为主&#xff0c;但是为了模拟一个App应用的效果&#xff0c;以及小程序包下载管理流程的实现&#xff0c;我们还是需要搭建一个基础的App应用。这里我们将选择 Tauri2.0 来…

langgraph学习2 - MCP编程

3中通信方式&#xff1a; 目前sse用的很少 3.开发mcp框架 主流框架2个&#xff1a; MCP skd 官方 Fast Mcp V2 &#xff0c;&#xff08;V1捐给MCP 官方&#xff09; 大模型如何识别用哪个tools&#xff0c; 以及如何使用tools&#xff1a;