Redis内存队列Stream

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别

文章目录

  • 概述
  • 一、生产者端操作
  • 二、消费者端操作
  • 三、消费组操作
  • 四、状态查询操作
  • 五、确认消息
  • 六、消息队列的选择


概述

  Stream是Redis5.0推出的支持多播的可持久化的消息队列
在这里插入图片描述
图片来源:图灵学院

  如上图所示,Stream依旧是key,value的形式,key对应的是队列的名称,而value的结构则是上图的链表,其主要的结构:

  • ID:每条消息都有一个唯一的ID,如果没有指定,则使用Redis自带的生成策略,格式为当前的毫秒级别时间戳-该毫秒时间点内的消息序号,是单调递增的。
  • Consumer Group:消费组,一个Stream中可以包含多个消费组,而每个消费组又由多个消费者组成。每个消费组是互相独立的,共同消费队列中的消息。
    • last_delivered_id:是一个游标,表示当前消费组已经消费到哪条消息了。同一个消费组中的任何一个消费者读取了消息都会使last_delivered_id往前移动。
  • Consumer:消费者,同一个消费组中的消费者是竞争关系,并能在组内由唯一的名称。
    • pending_ids[]:用于记录当前客户端已经读取,但是尚未ACK的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多。是Stream的ACK机制的实现,保证消息的可靠投递。
  • Message Content:消息内容,和其他类型的存储格式类似,都是key-value的格式。

一、生产者端操作

  向队列中添加一条消息,会返回消息的ID,队列不存在则会创建。

  • xadd stream:创建队列。
  • streamtest1:队列的名称,前缀需要加上stream。
  • *:自动生成队列中消息的ID 一定是单调有序自增的。
127.0.0.1:6381> xadd streamtest1 * name zhangsan age 18 
"1751714750056-0"

  查看队列的长度:

127.0.0.1:6381> xlen streamtest1

  xrange streamtest1 - + 查看队列中所有的元素:

  • xrange:关键字
  • streamtest1 :队列名称
  • -:表示查询范围的最小值,可以指定消息ID
  • +:表示查询范围的最大值,可以指定消息ID
1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
2) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
3) 1) "1751715003212-0"2) 1) "name"2) "wanger"3) "age"4) "25"

  xdel 删除指定ID的消息。

127.0.0.1:6381> xdel streamtest1 1751715003212-0
(integer) 1

二、消费者端操作

  从队列中读取消息:

  • xread:从队列读取消息
  • count:读取消息的条数
  • streams:关键字
  • streamtest1:队列名称
  • 0-0:指定读取消息的范围,可以指定ID
127.0.0.1:6381> xread count 1 streams streamtest1 0-0
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"

  可以指定读取的范围:

127.0.0.1:6381> xread count 1 streams streamtest1 1751714750056-0
1) 1) "streamtest1"2) 1) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"

  从队列的尾部读取数据,加$,但是默认不返回数据:

127.0.0.1:6381> xread count 1 streams streamtest1 $
(nil)

  需要配合使用block阻塞 + 另一个生产者写入一个新消息,:
  生产者发送消息:

127.0.0.1:6381> xadd streamtest1 * name zhaoliu age 17
"1751716196234-0"

  消费者阻塞等待生产者发送消息:

127.0.0.1:6381> xread block 0 count 1 streams streamtest1 $
1) 1) "streamtest1"2) 1) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
(44.14s)

  一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息
ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的
消息。

三、消费组操作

  创建消费者群组:

  • XGROUP create:创建消费者群组
  • streamtest1:队列名称
  • group1:群组名称
  • 0-0:从消息队列头部进行消费,如果是$ 从尾部消费
127.0.0.1:6381> XGROUP create streamtest1 group1 0-0
OK
127.0.0.1:6381> XGROUP create streamtest1 group2 $
OK

  消费者群组读取消息,同样支持阻塞读取

  • XREADGROUP group:关键字,群组从队列读取消息
  • group1:群组名称
  • g1:具体的消费者名称,群组内唯一
  • count:关键字,读取的数量
  • streams:关键字
  • streamtest1:队列名称
127.0.0.1:6381> XREADGROUP group group1 g1 count 1 streams streamtest1 >
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"

  在读取前,队列中的状态,group1中的consumers和pending都是0。

127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"

  读取之后,有了一个消费者,并且待确认的数量为1。

127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 15) "pending"6) (integer) 17) "last-delivered-id"8) "1751714750056-0"

四、状态查询操作

  查询队列的状态:

  • XINFO stream:关键字
  • streamtest1:队列的名称
127.0.0.1:6381> XINFO stream streamtest11) "length" 		# 长度2) (integer) 33) "radix-tree-keys" 4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"	# 队列中群组个数8) (integer) 29) "last-generated-id" # 最后生成消息的ID
10) "1751716196234-0"
11) "first-entry"	# 队列中第一个元素
12) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
13) "last-entry" # 队列中最后一个元素
14) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"

  查询消费者组的信息:

  • XINFO groups:关键字
  • streamtest1:队列的名称
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"	# 群组名称2) "group1"3) "consumers" # 消费者4) (integer) 05) "pending" # 待消费数量6) (integer) 07) "last-delivered-id" 8) "0-0"
2) 1) "name"2) "group2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1751716196234-0"

  查询某个消费者组中消费者的信息:

127.0.0.1:6381> XINFO consumers streamtest1 group1
1) 1) "name"2) "g1"3) "pending"	#代表查询了消息,但是没有确认的数量4) (integer) 15) "idle"6) (integer) 398240

五、确认消息

  当我使用group1消费者组中的g1消费者对streamtest1队列中的元素进行消费时,会返回一个Id和具体的元素信息:
在这里插入图片描述
  此时我还没有手动ack,查询消费者的信息,pending为1代表待确认。
在这里插入图片描述
  队列中所有的元素:
在这里插入图片描述
  确认消息的命令:

  • XACK:消息确认
  • streamtest1:队列名称
  • group1:分组名称
  • 1751714996108-0:待确认的消息的ID
127.0.0.1:6381> XACK streamtest1 group1 1751714996108-0
(integer) 1

  再去查询消费者的信息,发现pending变成了0:
在这里插入图片描述
  再次获取消息,获取的是已消费元素的下一个消息,当队列中所有的消息都ack之后,再次尝试获取消息,获取到的是nil。在这里插入图片描述
  消息的ack,仅仅是将消息从 消费者组的 Pending Entries List中移除,消息仍保留在 Stream 主体中,直到被主动删除。

六、消息队列的选择

  基于Redis实现消息队列通常有以下的方式:

  • List结构的lpush + brpop。
  • PUB/SUB模式。
  • Stream消息队列。

  如果一定需要使用Redis实现消息队列的功能,推荐使用Stream实现。前两者都有比较明显的弊端:

  • lpush + brpop的方案,如果线程一直阻塞,超过了一定的时间,客户端会断开连接,那么执行POP命令的线程就会抛出异常。并且消息的消费是点到点的,不支持分组消费,以及广播模式,重复消费。
  • PUB/SUB模式的方案,如果发布者发送消息时,订阅者不在线,那么这条消息就会丢失。并且无法存储消息。Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。

  使用Redis stream的注意点:

  1. Stream的消息过多怎么办?限制队列的长度,xadd 可以指定参数maxLen 防止队列爆满。
  2. 消费者没有对消息ack怎么办?消费组中的消费者有一个pending_ids的集合,没有ack,这个集合会越变越长。尽可能快速消费并ack。
  3. 出现死信问题怎么办?某个消息任意消费者消费都会出现异常,无法ack。通过xpending查询投递次数,超过一定的次数就认为是死信,执行xdel命令删除消息。
  4. 如何保证高可用?集群部署Redis,或者使用主从 + 哨兵迷失
  5. 如何进行消息分区?自己通过一致性hash算法实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913382.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Minio安装配置,桶权限设置,nginx代理 https minio

**起因:因为用到ruoyi-vue-plus框架中遇到生产环境是https,但是http的minio上传的文件不能在后台系统中访问**安装配置minio1. 下载安装2. 赋文件执行权限3.创建配置文件4.创建minio.service新版minio创建桶需要配置桶权限1.下载客户端2.设置访问权限3.连…

数论基础知识和模板

质数筛 用于快速处理 1~n 中所有素数的算法 因为依次遍历判断每一个数是否质数太慢,所以把一些明显不能质数的筛出来 普通筛法,对于每个整数,删除掉其倍数。 bool vis[N];//0表示是质数 int pri[N],o; //质数表 void get(int n…

Ubuntu20.04.6桌面版系统盘制作与安装

概述 本教程讲述Ubuntu20.04.6桌面版的系统U盘制作与安装,所需工具为一台电脑、大于4G的U盘、一个需要安装Ubuntu系统的主机。 步骤1:下载系统镜像与rufus 在ubuntu官网下载 ubuntu-20.04.6-desktop-amd64.iso,如图 下载rufus工具&#xf…

【C++复习3】类和对象

1.3.1.简述一下什么是面向对象回答:1. 面向对象是一种编程思想,把一切东西看成是一个个对象,比如人、耳机、鼠标、水杯等,他们各 自都有属性,比如:耳机是白色的,鼠标是黑色的,水杯是…

数据结构之二叉平衡树

系列文章目录 数据结构之ArrayList_arraylist o(1) o(n)-CSDN博客 数据结构之LinkedList-CSDN博客 数据结构之栈_栈有什么方法-CSDN博客 数据结构之队列-CSDN博客 数据结构之二叉树-CSDN博客 数据结构之优先级队列-CSDN博客 常见的排序方法-CSDN博客 数据结构之Map和Se…

Maven引入第三方JAR包实战指南

要将第三方提供的 JAR 包引入本地 Maven 仓库,可通过以下步骤实现(以 Oracle JDBC 驱动为例):🔧 方法 1:使用 install:install-file 命令(推荐)定位 JAR 文件 将第三方 JAR 包&#…

JavaSE -- 泛型详细介绍

泛型 简介 集合存储数据底层是利用 Object 来接收的,意思是说如果不对类型加以限制,所有数据类型柔和在一起,这时如何保证数据的安全性呢(如果不限制存入的数据类型,任何数据都能存入,当我们取出数据进行强…

使用 Python 实现 ETL 流程:从文本文件提取到数据处理的全面指南

文章大纲: 引言:什么是 ETL 以及其重要性 ETL(提取-转换-加载)是数据处理领域中的核心概念,代表了从源数据到目标系统的三个关键步骤:**提取(Extract)**数据、**转换(Tra…

selenium基础知识 和 模拟登录selenium版本

前言 selenium框架是Python用于控制浏览器的技术,在Python爬虫获取页面源代码的时候,是最重要的技术之一,通过控制浏览器,更加灵活便捷的获取浏览器中网页的源代码。 还没有安装启动selenium的同志请先看我的上一篇文章进行配置启动 和 XPath基础 对selenium进行浏览器和驱动…

JS 网页全自动翻译v3.17发布,全面接入 GiteeAI 大模型翻译及自动部署

两行 js 实现 html 全自动翻译。 无需改动页面、无语言配置文件、无 API Key、对 SEO 友好! 升级说明 translate.service 深度绑定 GiteeAI 作为公有云翻译大模型算力支持translate.service 增加shell一键部署后通过访问自助完成GiteeAI的开通及整个接入流程。增加…

数据结构:数组:插入操作(Insert)与删除操作(Delete)

目录 插入操作(Inserting in an Array) 在纸上模拟你会怎么做? 代码实现 复杂度分析 删除操作(Deleting from an Array) 在纸上模拟一下怎么做? 代码实现 复杂度分析 插入操作(Inserti…

Qt之修改纯色图片的颜色

这里以修改QMenu图标颜色为例,效果如下: MyMenu.h #ifndef MYMENU_H #define MYMENU_H#include <QMenu>class MyMenu : public QMenu { public:explicit MyMenu(QWidget *parent = nullptr);protected:void mouseMoveEvent(QMouseEvent *event) override; };#endif /…

uni-app实现单选,多选也能搜索,勾选,选择,回显

前往插件市场安装插件下拉搜索选择框 - DCloud 插件市场&#xff0c;该插件示例代码有vue2和vue3代码 是支持微信小程序和app的 示例代码&#xff1a; <template><view><!-- 基础用法 --><cuihai-select-search:options"options"v-model&quo…

【机器学习深度学习】 微调的十种形式全解析

目录 一、为什么要微调&#xff1f; 二、微调的 10 种主流方式 ✅ 1. 全参数微调&#xff08;Full Fine-tuning&#xff09; ✅ 2. 冻结部分层微调&#xff08;Partial Fine-tuning&#xff09; ✅ 3. 参数高效微调&#xff08;PEFT&#xff09; &#x1f538; 3.1 LoRA&…

信刻光盘安全隔离与文件单向导入/导出系统

北京英特信网络科技有限公司成立于2005年&#xff0c;是专业的数据光盘摆渡、刻录分发及光盘存储备份领域的科技企业&#xff0c;专注为军队、军工、司法、保密等行业提供数据光盘安全摆渡、跨网交换、档案归档检测等专业解决方案。 公司立足信创产业&#xff0c;产品国产安全可…

Python-标准库-os

1 需求 2 接口 3 示例 4 参考资料 在 Python 中&#xff0c;os&#xff08;Operating System&#xff09;模块是一个非常重要的内置标准库&#xff0c;提供了许多与操作系统进行交互的函数和方法&#xff0c;允许开发者在 Python 程序中执行常见的操作系统任务&#xff0c;像文…

OpenCV CUDA模块设备层-----在 GPU 上执行类似于 std::copy 的操作函数warpCopy()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 OpenCV 的 CUDA 模块&#xff08;cudev&#xff09; 中的一个设备端内联模板函数&#xff0c;用于在 GPU 上执行类似于 std::copy 的操作&#xff…

Vue Router 中$route.path与 params 的关系

1. params 参数的本质&#xff1a;路径的动态片段在 Vue Router 中&#xff0c;params 参数是通过路由配置的动态路径片段定义的&#xff0c;例如&#xff1a;// 路由配置{ path: /user/:id, component: User }当访问/user/123时&#xff0c;/user/123是完整的路径&#xff0c;…

React 极简响应式滑块验证组件实现,随机滑块位置

&#x1f3af; 滑块验证组件 (Slider Captcha) 一个现代化、响应式的滑块验证组件&#xff0c;专为 React 应用设计&#xff0c;提供流畅的用户体验和强大的安全验证功能。 ✨ 功能特性 &#x1f3ae; 核心功能 智能滑块拖拽 – 支持鼠标和触摸屏操作&#xff0c;响应灵敏随…

STM32第十六天蓝牙模块

一&#xff1a;蓝牙模块HC-05 1&#xff1a;硬件引脚配置&#xff1a; | 标号 | PIN | 说明 | |------|-------|---------------------------------------| | 1 | START | 状态引出引脚&#xff08;未连接/连接输出信号时&#xff09; |…