Spring for Apache Pulsar->Reactive Support->Message Production

好消息:Spring for Apache Pulsar这两天刚刚升到2.0.0版本

1. ReactivePulsarTemplate

在Pulsar生产者端,Spring Boot自动配置提供了一个ReactivePulsarTemplate用于发布记录。该模板实现了一个名为ReactivePulse Operations的接口,并提供了通过其合约发布记录的方法。

该模板提供了send方法,可以接受单个消息并返回Mono<MessageId>。它还提供了send方法,可以接受多条消息(以ReactiveStreams Publisher类型的形式)并返回Flux<MessageId>。

对于不包含主题参数的API变体,将使用主题解析过程来确定目标主题。

1.1. Fluent API

该模板提供了一个流畅的构建器来处理更复杂的发送请求。

1.2. Message customization

您可以指定MessageSpecBuilderCustomizer来配置传出消息。例如,以下代码显示了如何发送键控消息:

template.newMessage(msg).withMessageCustomizer((mc) -> mc.key("foo-msg-key")).send();

1.3. Sender customization

您可以指定一个ReactiveMessageSenderBuilderCustomizer来配置底层Pulsar发送器生成器,该生成器最终构建用于发送传出消息的发送器。

请谨慎使用,因为这可以完全访问发送方构建器,调用其某些方法(如create)可能会产生意想不到的副作用。

例如,以下代码显示了如何禁用批处理和启用分块:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false)).send();

另一个示例显示了如何在将记录发布到分区主题时使用自定义路由。在发送方构建器上指定自定义MessageRouter实现,例如:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.messageRouter(messageRouter)).send();

请注意,使用MessageRouter时,spring.pulsar.producter.message-routing-mode的唯一有效设置是自定义。

2. Specifying Schema Information

如果您使用Java基元类型,框架会自动为您检测模式,您不需要指定任何模式类型来发布数据。对于非基元类型,如果在ReactivePulsarTemplate上调用send操作时没有明确指定Schema,则Spring For Apache Pulsar框架将尝试构建Schema。JSON类型。

目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。

2.1. Custom Schema Mapping

作为在ReactivePulse Template上为复杂类型调用发送操作时指定模式的替代方法,可以使用类型的映射配置模式解析器。这消除了在框架使用传出消息类型咨询解析器时指定模式的需要。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings属性进行配置。以下示例使用application.yml分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息类型是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上述属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用模式解析器定制器分别使用AVRO和JSON模式为User和Address复杂对象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息类型的默认模式信息的另一种选择是用@PulsarMessage注释标记消息类。可以通过注释上的schemaType属性指定架构信息。

以下示例将系统配置为在生成或使用Foo类型的消息时使用JSON作为默认模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在发送操作上设置或指定模式。

2.2. Producing with AUTO_SCHEMA

如果没有机会提前知道Pulsar主题的模式类型,您可以使用AUTO_PRODUCE模式将原始JSON或Avro有效载荷安全地发布为byte[]。

在这种情况下,生产者会验证出站字节是否与目标主题的模式兼容。

只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()发送操作如下例所示:

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}

这仅支持Avro和JSON模式类型。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate依赖于ReactivePulse SenderFactory来实际创建底层发送方。

Spring Boot提供了这个发送器工厂,可以配置任何Spring.pulser.producer.*应用程序属性。

如果直接使用发送方工厂API时未指定主题信息,则使用ReactivePulse Template使用的相同主题解析过程,但省略了“消息类型默认”步骤。

3.1. Producer Caching

每个底层Pulsar生产者都会消耗资源。为了提高性能并避免持续创建生产者,底层Apache Pulsar Reactive客户端中的ReactiveMessageSenderCache缓存了它创建的生产者。它们以LRU方式缓存,并在配置的时间段内未被使用时被驱逐。

您可以通过指定任何spring.pulsinger.producer.cache.*应用程序属性来配置缓存设置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913851.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AtCoder Beginner Contest 413

比赛链接如下&#xff1a;Denso Create Programming Contest 2025&#xff08;AtCoder Beginner Contest 413&#xff09; - AtCoder A - Content Too Large Problem Statement Takahashi has N items and one bag. The size of the i-th (1≤i≤N) item is Ai​, and the si…

Java学习---JVM(1)

JVM&#xff0c;即Java虚拟机&#xff0c;其是Java程序的运行环境&#xff0c;是Java技术的核心组成部分&#xff0c;本次就JVM的自动内存管理详细展开&#xff1a;JVM的内存区域分为2大类&#xff0c;即线程私有的和线程共享的&#xff0c;前者分为3大块&#xff0c;虚拟机栈、…

Qt去噪面板搭建

建立单选互斥性面板用于选择噪声属性// 创建去噪面板 QWidget* noisePanel new QWidget(); QVBoxLayout* mainLayout new QVBoxLayout(noisePanel); mainLayout->setContentsMargins(10, 10, 10, 10); mainLayout->setSpacing(15);// 去噪方法选择组QGroupBox* methodG…

无需公网IP的文件交互:FileCodeBox容器化部署技术解析

文章目录 前言1.Docker部署2.简单使用演示3. 安装cpolar内网穿透4. 配置公网地址5. 配置固定公网地址 前言 在数字化办公需求日益增长的今天&#xff0c;文件传输已成为职场协作的高频刚需。传统共享方式却饱受诟病&#xff1a;"需要安装哪些臃肿客户端&#xff1f;免费版…

1. http 有哪些版本,你是用的哪个版本,怎么查看

http 有哪些版本&#xff0c;你是用的哪个版本&#xff0c;怎么查看 总结&#xff1a;http 版本有 0.9/1.0/1.1/2.0/3.0&#xff0c;我们常用的是 1.1 和 2.0&#xff0c;使用 window.chrome.loadTimes() 获取 http 版本。 常见的 HTTP 版本 HTTP/0.9&#xff1a;最初的版本&am…

C# IIncrementalGenerator干点啥

生成器项目 得基于.Net Stander 2.0 重要&#xff1a;<IsRoslynComponent>true</IsRoslynComponent>、<IncludeBuildOutput>false</IncludeBuildOutput>、 <PackageReference Include"Microsoft.CodeAnalysis" Version"4.14.0&q…

在徐州网络中服务器租用与托管的优势

一、高性价比&#xff1a;徐州万恒提供多种配置的服务器供租用&#xff0c;满足不同企业和个人的业务需求&#xff0c;无论是初创企业追求低成本高效能&#xff0c;还是对性能有严苛要求的大型项目&#xff0c;都能找到合适的服务器型号&#xff0c;以极具竞争力的价格获取强大…

学习软件测试的第十四天(移动端)

一.常用的abd命令有哪些1.什么是 ADB&#xff1f;通俗解释&#xff1a; ADB 就像一个桥梁&#xff0c;让电脑能控制连接的手机&#xff0c;比如安装APP、抓日志、重启设备等。专业术语总结&#xff1a; ADB&#xff08;Android Debug Bridge&#xff09;是 Android SDK 提供的命…

04-ES6

let和const命令ES6中新增了let命令&#xff0c;用来声明变量&#xff0c;用法类似与varlet和var的不同&#xff1a;1、不存在变量提升 console.log(a); //Cannot access a before initializationlet a 100;2、同一个作用域不能重复定义同一个名称var c 20;let c 30;c…

基于GeographicLib实现测站地平坐标系(东北天)转地心固定坐标系XYZ

一、概述主要内容&#xff1a;本文基于GeographicLib开源库&#xff0c;实现了一个地理空间坐标转换功能&#xff0c;主要用于根据观测站的位置和目标的相对方位信息&#xff0c;计算目标在地球坐标系中的绝对位置。输入&#xff1a;观测站的经纬度坐标(纬度、经度、海拔高度)和…

若依框架去掉Redis

这篇文章全是按照我的实战操作来的&#xff0c;本文一是记录一下这个过程&#xff0c;二是帮助更多的人少走弯路。 接下来我们看实战&#xff1a;第一步毋庸置疑&#xff0c;就是找到配置文件application.yml里面大redis配置部分&#xff0c;直接注释掉 注意这里的data:这是否注…

【会员专享数据】2013-2024年我国省市县三级逐日SO₂数值数据(Shp/Excel格式)

之前我们分享过2013-2024年全国范围逐日SO₂栅格数据&#xff08;可查看之前的文章获悉详情&#xff09;!该数据来源于韦晶博士、李占清教授团队发布在国家青藏高原科学数据中心网站上的中国高分辨率高质量近地表空气污染物数据集。很多小伙伴拿到数据后反馈栅格数据不太方便使…

TCP SYN、UDP、ICMP之DOS攻击

一、实验背景 Dos攻击是指故意的攻击网络协议实现的缺陷或直接通过野蛮手段残忍地耗尽被攻击对象的资源&#xff0c;目的是让目标计算机或网络无法提供正常的服务或资源访问&#xff0c;使目标系统服务系统停止响应甚至崩溃。 二、实验设备 1.一台靶机Windows主机 2.增加一个网…

Ntfs!LfsUpdateLfcbFromRestart函数分析之根据Ntfs!_LFS_RESTART_AREA初始化Ntfs!_LFCB

第一部分&#xff1a;LfsUpdateLfcbFromRestart( ThisLfcb,FileSize,DiskRestartArea,FirstRestar1: kd> p Ntfs!LfsRestartLogFile0x317: f71fc8dd e820e5ffff call Ntfs!LfsUpdateLfcbFromRestart (f71fae02) 1: kd> t Ntfs!LfsUpdateLfcbFromRestart: f71fae0…

Qt开发:QtConcurrent介绍和使用

文章目录一、QtConcurrent 简介二、常用功能分类2.1 异步运行一个函数&#xff08;无返回值&#xff09;2.2 异步运行一个带参数的函数&#xff08;有返回值&#xff09;2.3 绑定类成员函数2.4 容器并行处理&#xff08;map&#xff09;三、线程池控制四、取消任务五、典型应用…

企业数据开发治理平台选型:13款系统优劣对比

本文将深入对比13款主流的数据指标管理平台&#xff1a;1.网易数帆&#xff1b; 2.云徙科技&#xff1b; 3.数澜科技&#xff1b; 4.用友数据中台&#xff1b; 5.龙石数据中台&#xff1b; 6.SelectDB&#xff1b; 7.得帆云 DeHoop 数据中台&#xff1b; 8.Talend&#xff1b; …

Java JDK 下载指南

Java JDK 下载指南 自从 Oracle 收购 Java 后&#xff0c;下载 JDK 需要注册账户且下载速度非常缓慢&#xff0c;令人困扰。 解决方案&#xff1a; 华为云提供了便捷的 JDK 下载镜像&#xff0c;访问速度快且无需注册&#xff1a; https://repo.huaweicloud.com/java/jdk/ 高…

QT数据交互全解析:JSON处理与HTTP通信

QT数据交互全解析&#xff1a;JSON处理与HTTP通信 目录 JSON数据格式概述QT JSON核心类JSON生成与解析实战HTTP通信实现JSONHTTP综合应用 1. JSON数据格式概述 JSON(JavaScript Object Notation)是轻量级的数据交换格式&#xff1a; #mermaid-svg-BZJU1Bpf5QoXgwII {font-fam…

Function Call大模型的理解(大白话版本)

由来---场景设计你雇了一位 超级聪明的百科全书管家&#xff08;就是大模型&#xff0c;比如GPT&#xff09;。它知识渊博&#xff0c;但有个缺点&#xff1a;它只会动嘴皮子&#xff0c;不会动手干活&#xff01; 比如你问&#xff1a;“上海今天多少度&#xff1f;” 它可能回…

【PTA数据结构 | C语言版】求两个正整数的最大公约数

本专栏持续输出数据结构题目集&#xff0c;欢迎订阅。 文章目录题目代码题目 请编写程序&#xff0c;求两个正整数的最大公约数。 输入格式&#xff1a; 输入在一行中给出一对正整数 0<x,y≤10^6&#xff0c;数字间以空格分隔。 输出格式&#xff1a; 在一行中输出 x 和 …