PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的区别

python容易编辑,因此用pyrx代替rxjava3做演示会比较快捷。

pyrx安装命令: pip install rx

一、Subject(相当于 RxJava 的 PublishSubject

PublishSubject

PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用 Observable 的 create 方法来创建 Observable,或者使用 ReplaySubject。

如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

特性

  • 只发送订阅后产生的事件,不保留历史值。
  • 新订阅者只能收到订阅后发射的元素。

适用场景
实时数据流(如用户输入、网络事件)。

示例代码
from rx.subject import Subjectsubject = Subject()# 订阅1在事件发射前订阅
subject.subscribe(on_next=lambda value: print("订阅1:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")  # 订阅1收到: 🐶# 订阅2在事件发射后订阅
subject.subscribe(on_next=lambda value: print("订阅2:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐱")  # 订阅1收到: 🐱,订阅2收到: 🐱

二、ReplaySubject

ReplaySubject

ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。

这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。

如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNextonError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。

特性

  • 缓存所有发射过的事件,新订阅者会收到全部历史事件。
  • 可通过 buffer_size 参数限制缓存数量。

适用场景
需要回放历史数据的场景(如配置变更、初始化数据)。

示例代码
from rx.subject import ReplaySubjectsubject = ReplaySubject(buffer_size=2)  # 只缓存最近2个事件subject.on_next("🐶")
subject.on_next("🐱")
subject.on_next("🐭")# 订阅时会收到缓存的最后2个事件: 🐱, 🐭
subject.subscribe(on_next=lambda value: print("订阅1:", value))subject.on_next("🐹")  # 订阅1收到: 🐹

三、BehaviorSubject

BehaviorSubject

当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。

如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

特性

  • 缓存最后一个发射的事件,新订阅者会立即收到该值。
  • 创建时必须提供初始值。

适用场景
状态管理(如用户登录状态、系统配置)。

示例代码
from rx.subject import BehaviorSubjectsubject = BehaviorSubject("初始值")subject.on_next("🐶")# 订阅时会收到最后一个值: 🐶
subject.subscribe(on_next=lambda value: print("订阅1:", value))subject.on_next("🐱")  # 订阅1收到: 🐱

四、AsyncSubject

AsyncSubject

AsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。

它会对随后的观察者发出最终元素。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

特性

  • 只发射最后一个事件,且仅在 on_completed() 后发射。
  • 如果未调用 on_completed(),订阅者不会收到任何值。

适用场景
只关心最终结果的场景(如计算完成后的结果)。

示例代码
from rx.subject import AsyncSubjectsubject = AsyncSubject()subject.subscribe(on_next=lambda value: print("订阅1:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")
subject.on_next("🐱")
subject.on_completed()  # 订阅1收到: 🐱(最后一个值)并触发完成

五、对比表格

Subject 类型历史值处理新订阅者行为触发条件
Subject不保留历史值只接收订阅后的事件无特殊条件
ReplaySubject缓存所有或部分历史值接收全部缓存的历史事件无特殊条件
BehaviorSubject缓存最后一个值立即接收最后一个值无特殊条件
AsyncSubject缓存最后一个值仅在 on_completed() 后接收必须调用 on_completed()

六、注意事项

  1. 内存管理
    ReplaySubject 和 BehaviorSubject 会持有历史值,需注意避免内存泄漏。

  2. 线程安全
    RxPY 的 Subject 默认非线程安全,多线程环境下需自行处理同步。

  3. 生命周期管理
    使用 dispose() 方法释放资源,避免不必要的事件处理。

rxjava3具体实例:

在引入rxjava3之前的写法:通过监听器,实现register、unregister,代码逻辑臃肿、结构复杂、过一段时间之后自己写的代码都看起来很费劲。

引入rxjava3之后,activity、fragment、service之间解除了强耦合,代码嵌套深度降低、逻辑交叉点减少,代码清爽很多。

rx是响应式编程框架的集大成者,相当于应用内部的轻量级的ASMQ(高级消息队列),前端是ui和逻辑分离的特点,需要大量的数据双向多层传递。  用rx可以从出发点直达终点,数据不需要层层传递,比如说原来的传递路径是6层,你修改一次数据类,你就需要修改6个地方的代码,用rx只需要修改前后紧挨着的2个数据管道之间的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BLE中心与外围设备MTU协商过程详解

一、MTU基础概念​​ 1. ​​MTU定义​​ ​​最大传输单元(MTU)​​ 指单次数据传输中允许的最大字节数,包含协议头部(3字节)和有效载荷(最多517字节)。BLE默认MTU为​​23字节​​&a…

【华为云Astro-服务编排】服务编排使用全攻略

目录 概述 为什么使用服务编排 服务编排基本能力 拖拉拽式编排流程 逻辑处理 对象处理 服务单元组合脚本、原生服务、BO、第三方服务 服务编排与模块间调用关系 脚本 对象 标准页面 BPM API接口 BO 连接器 如何创建服务编排 创建服务编排 如何开发服务编排 服…

centos实现SSH远程登录

1. 生成SSH密钥对 首先,你需要在客户端机器上生成一个SSH密钥对。打开终端,执行以下命令 ssh-keygen 或ssh-keygen -t rsa -b 2048(效果相同) 按照提示操作,可以按回车键接受默认的文件名(通常是~/.ssh/id_…

定制开发开源AI智能名片S2B2C商城小程序在无界零售中的应用与行业智能升级示范研究

摘要:本文聚焦无界零售背景下京东从零售产品提供者向零售基础设施提供者的转变,探讨定制开发开源AI智能名片S2B2C商城小程序在这一转变中的应用。通过分析该小程序在商业运营成本降低、效率提升、用户体验优化等方面的作用,以及其与京东AI和冯…

ZooKeeper 安装教程(Windows + Linux 双平台)

ZooKeeper 安装教程(Windows + Linux 双平台) Zookeeper 和 Kafka 版本与 JDK 要求 一、安装前准备 系统要求 Java 环境(JDK17+)开放端口:2181(客户端),2888(集群通信),3888(选举)安装 Java Linux(Ubuntu/CentOS) # Ubuntu

【Git系列】如何同步原始仓库的更新到你的fork仓库?

🎉🎉🎉欢迎来到我们的博客!无论您是第一次访问,还是我们的老朋友,我们都由衷地感谢您的到来。无论您是来寻找灵感、获取知识,还是单纯地享受阅读的乐趣,我们都希望您能在这里找到属于…

Could not obtain transaction-synchronized Session for current thread

背景 写了一个函数,分别支持手动调用和定时任务调用。 测试的时候一直用手动点击按钮触发函数,功能可用 等到了测试定时任务的时候,后台报错 Could not obtain transaction-synchronized Session for current thread错误分析 事务管理不匹…

linux nm/objdump/readelf/addr2line命令详解

我们在开发过程中通过需要反汇编查看问题,那么我们这里使用rk3568开发板来举例nm/objdump/readelf/addr2line 分析动态库和可执行文件以及.o文件。 1,我们举例nm/objdump/readelf/addr2line解析linux 内核文件vmlinux (1),addr2…

C++自定义简单的内存池

内存池简述 在C的STL的容器中的容器如vector、deque等用的默认分配器(allocator)都是从直接从系统的堆中申请内存,用一点申请一点,效率极低。这就是设计内存池的意义,所谓内存池,就是一次性向系统申请一大片内存(预分…

【极客日常】分享go开发中wire和interface配合的一些经验

在先前一篇文章中,笔者给大家提到了go语言后端编程可以用wire依赖注入模块去简化单例服务的初始化,同时也可以解决服务单例之间复杂依赖的问题。但实事求是来讲,用wire也是有一些学习成本的,wire在帮助解决复杂依赖的问题同时&…

20250605车充安服务器受木马攻击导致服务不可用

https://mp.weixin.qq.com/s/2JyxmDIDBa9_owNjIJ6UIg 因业务服务器受木马攻击,服务器网络资源损耗,业务负载能力受损

web3-虚拟合约 vs 现实合同:权利、义务与资产的链上新秩序

web3-虚拟合约 vs 现实合同:权利、义务与资产的链上新秩序 一、智能合约vs真实世界合约 传统合约:基础要素 如下图,现实世界的合约,会有一个条款,然后下面还有一个“Alice”的签名 提出合约和接受合约; …

【面经分享】京东

线程池核心参数 7 个参数。 coreSize maxSize 阻塞队列 时间 时间 线程工厂 拒绝策略 核心参数的话,有 coreSize、阻塞队列、拒绝策略。 JVM 组成 内存上划分: 线程私有:Java 虚拟机栈,本地方法栈、Tlab、程序计数器 …

工作流引擎-11-开源 BPM 项目 jbpm

工作流引擎系列 工作流引擎-00-流程引擎概览 工作流引擎-01-Activiti 是领先的轻量级、以 Java 为中心的开源 BPMN 引擎,支持现实世界的流程自动化需求 工作流引擎-02-BPM OA ERP 区别和联系 工作流引擎-03-聊一聊流程引擎 工作流引擎-04-流程引擎 activiti 优…

深度学习在非线性场景中的核心应用领域及向量/张量数据处理案例,结合工业、金融等领域的实际落地场景分析

一、工业场景:非线性缺陷检测与预测 1. ‌半导体晶圆缺陷检测‌ ‌问题‌:微米级划痕、颗粒污染等缺陷形态复杂,与正常纹理呈非线性关系。‌解决方案‌: ‌输入张量‌:高分辨率晶圆图像 → 三维张量 (Batch, Height,…

Python-线程同步

多线程 案例 说明: 唱歌方法 sing()跳舞方法 dance()启用两个线程调用主线程结束 代码 # 导入线程模块 import threading import timedef sing(name,age):time.sleep(2)print(唱歌者姓名: name ,年龄: str(age))print(正在唱…

前端八股之JS的原型链

1.原型的定义 每一个对象从被创建开始就和另一个对象关联,从另一个对象上继承其属性,这个另一个对象就是 原型。 当访问一个对象的属性时,先在对象的本身找,找不到就去对象的原型上找,如果还是找不到,就去…

kafka命令

kafka安装先安装zookeeper,jdk 确保jdk版本与kafka版本匹配: 先启动zookeeper: # 启动独立安装的zookeeper ./zkServer.sh start # 也可以自动kafka自带的zookerper ./zookeeper-server-start.sh ../config/zookeeper.pr…

微服务面试(分布式事务、注册中心、远程调用、服务保护)

1.分布式事务 分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如: 跨数据源的分布式事务跨服务的分布式事务综合情况 我们之前解决分布式事务问题是直接使用Seata框架的AT模式,但是解决分布式事务…

Linux --进程优先级

概念 什么是进程优先级,为什么需要进程优先级,怎么做到进程优先级这是本文需要解释清楚的。 优先级的本质其实就是排队,为了去争夺有限的资源,比如cpu的调度。cpu资源分配的先后性就是指进程的优先级。优先级高的进程有优先执行的…