消息队列的使用

使用内存队列来处理基于内存的【生产者-消费者】场景

思考和使用Disruptor

  • Disruptor可以实现单个或多个生产者生产消息,单个或多个消费者消息,且消费者之间可以存在消费消息的依赖关系

  • 使用Disruptor需要结合业务特性,设计要灵活

  • 什么业务场景适合使用Disruptor

    • Disruptor核心优势在于极致的低延迟和极高吞吐量,且通信发生在单个JVM进程内部的场景

    • 高频交易系统 (HFT - High-Frequency Trading):

      • 场景描述: 金融市场中的算法交易,需要在微秒甚至纳秒级别对市场数据做出反应,并快速下单。延迟每降低一点,都可能带来巨大的竞争优势。

      • 为何适合: Disruptor 最初就是为 LMAX 交易所设计的,用于处理海量的订单和行情数据。其低延迟特性对于捕捉转瞬即逝的交易机会至关重要。它可以用于订单处理流水线、市场数据分发、风险控制计算等。

    • 实时风控与反欺诈系统:

      • 场景描述: 在支付、交易、登录等关键操作发生时,需要实时分析用户行为、交易模式等,快速识别潜在的风险或欺诈行为,并在毫秒级内做出决策(如阻止交易、要求额外验证)。

      • 为何适合: 需要处理高并发的事件流,并进行复杂的规则匹配和计算,同时对响应时间有极高要求。Disruptor 可以作为事件处理引擎的核心,确保快速处理和决策。

    • 高性能日志处理框架:

      • 场景描述: 应用程序产生大量日志,需要异步地、高效地将日志事件从业务线程传递给日志写入线程,同时尽量减少对业务线程性能的影响。

      • 为何适合: Log4j2 的 Async Loggers 就是基于 Disruptor 实现的。它可以显著降低日志记录操作对应用主线程的阻塞时间,提高应用的整体吞吐量。

    • 游戏服务器事件处理:

      • 场景描述: 大型多人在线游戏(MMO)服务器需要处理来自成千上万玩家的并发操作(移动、攻击、聊天等),并实时更新游戏世界状态,广播给其他相关玩家。

      • 为何适合: 游戏服务器对延迟非常敏感,任何卡顿都会严重影响玩家体验。Disruptor 可以用来构建高效的事件处理循环,快速响应玩家输入并分发状态更新。

    • 实时数据分析与复杂事件处理 (CEP - Complex Event Processing):

      • 场景描述: 从各种数据源(如传感器、网络流量、用户行为日志)接收高速数据流,实时识别特定模式、趋势或异常,并触发相应动作。

      • 为何适合: 需要在大量数据涌入时,以极低的延迟进行匹配和分析。Disruptor 可以作为CEP引擎内部事件排队和分发的骨干。

    • 网络数据包处理/高性能网络应用:

      • 场景描述: 构建需要处理大量并发连接和高速网络数据包的服务器应用,如自定义的应用层网关、高性能代理服务器等。

      • 为何适合: 当网络 I/O 线程接收到数据包后,需要快速地将这些数据包(或解析后的事件)分发给工作线程进行处理。Disruptor 可以作为 I/O 线程和业务逻辑处理线程之间的高效桥梁。

    • 任务调度与并行计算的内部协调:

      • 场景描述: 在一个复杂的计算任务中,可以将任务分解为多个阶段,由不同的线程组处理。阶段之间的数据传递需要高效且低延迟。

      • 为何适合: 如果这些阶段都在同一个JVM内部,并且对性能要求极高,Disruptor 可以作为这些并行处理单元之间的数据交换通道,避免传统队列的锁竞争开销。

Kafka

消息队列的设计意图

当消费不均衡(生产者生产的过快消费者消费的过快)时,就在生产者和消费者中间加一个缓冲层,这个缓冲层就是消息队列

消息队列是分布式系统中的重要组件

消息队列的作用

  • 异步:提升吞吐量

  • 解耦:减少依赖,生产者和消费者之间没有直接的依赖,一个系统的故障不会影响另一个系统,保证系统的稳定性和健壮性

  • 削峰填谷:消除短时负载过高

    • 削峰:生产者的速度非常的高,并发流量非常的大,此时可以增加消费者线程,提高并发处理能力,来达到生产和消费的平衡

    • 填谷:生产的频率降低,流量变小,此时可以减少一些消费者线程,来达到生产和消费的平衡

  • 顺序性保证

  • 可靠性保证:数据持久化

从整体的角度来看Kafka

Kafka分区再均衡(Rebalance, 平衡)

Kafka数据存储

  • 日志文件消息格式

消息丢失和重复消费

Kafka消息丢失

从Kafka生产,消息持久化,消费过程看消息丢失

生产,消息持久化,消费过程丢失的解决方案

Kafka重复消费

  • 重复消费的根本原因在于:已经消费了数据,但是offset没有成功提交,很大一部分原因是再均衡

    • 消费者宕机,重启,消费了消息但是没有提交offset

    • 还没有提交offset时,发生了rebalance

    • 消息处理耗时太大,超过了(max.poll.interval.ms),发生了rebalance

  • 重复消费的解决方案

    • 最根本的解决方案是消费消息保证幂等性

      • 记录消息表,使用唯一索引

      • 缓存消费过的消息id(位图)

使用好Kafka

集成使用Kafka

常见的两种方法使用Kafka
  • 使用@KafkaListener把消费过程(poll和提交offset)交给框架

  • 自己管理消息的拉取(poll)和消息偏移量(offset)的提交

生产者发送消息有三种方式

  • 发送之后什么都不管

  • 同步发送

  • 异步发送

消费者消费消息

  • 消费者主动拉取消息消费

  • 通过注解实现消息的监听消费(@KafkaListener)

延迟队列和优先级队列

RabbitMQ架构模型

通过RabbitMQ实现延迟队列和优先级队列

死信队列

死信:如果队列中消息出现以下两种情况,则消息变为死信状态

  • 如果消息在队列中的时间超过了我设置的ttl(过期时间)

  • 消息队列的消息数量超过了最大的队列长度

优先级队列:最大值是255,最小值是0,值越大,优先级越高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/81299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《帝国时代1》游戏秘籍

资源类 PEPPERONI PIZZA:获得 1000 食物。COINAGE:获得 1000 金。WOODSTOCK:获得 1000 木头。QUARRY:获得 1000 石头。 建筑与生产类 STEROIDS:快速建筑。 地图类 REVEAL MAP:显示所有地图。NO FOG&#xf…

使用JSP踩过的坑

虽然说jsp已经过时了,但是有时维护比较老的项目还是需要的。 下面说下,我使用jsp踩过的坑: 1.关于打印输出 在jsp中输出使用 out.println("hello");而不是 System.out.println("hello");如果在定义函数部分需要打印…

redis集群创建时手动指定主从关系的方法

适用场景: 创建主从关系时默认参数 --cluster-replicas 1 会自动分配从节点。 为了能精确控制 Redis Cluster 的主从拓扑结构,我们通过 Redis Cluster 的手动分片功能来实现 一、手动指定主从关系的方法 使用 redis-cli --cluster-replicas 0 先创建纯…

ROS合集(七)SVIn2声呐模块分析

文章目录 一、整体思想二、具体误差建模流程三、总结明确(预测值与观测值)四、选点逻辑五、Sonar 数据处理流水线1. ROS Launch 配置(imagenex831l.launch)2. SonarNode 节点(sonar_node.py)3. Subscriber …

Python爬虫实战:研究PySpider框架相关技术

1. 引言 1.1 研究背景与意义 网络爬虫作为互联网数据采集的重要工具,在信息检索、舆情分析、市场调研等领域发挥着重要作用。随着互联网信息的爆炸式增长,如何高效、稳定地获取所需数据成为了一个关键挑战。PySpider 作为一款功能强大的 Python 爬虫框架,提供了丰富的功能…

《大模型开源与闭源的深度博弈:科技新生态下的权衡与抉择》

开源智能体大模型的核心魅力,在于它构建起了一个全球开发者共同参与的超级协作网络。想象一下,来自世界各个角落的开发者、研究者,无论身处繁华都市还是偏远小镇,只要心怀对技术的热爱与追求,就能加入到这场技术狂欢中…

大数据模型对陌生场景图像的识别能力研究 —— 以 DEEPSEEK 私有化部署模型为例

摘要 本研究聚焦于已训练的大数据模型能否识别未包含在样本数据集中的陌生场景图像这一问题,以 DEEPSEEK 私有化部署模型为研究对象,结合机器学习理论,分析模型识别陌生场景图像的影响因素,并通过理论探讨与实际应用场景分析&…

STM32——从点灯到传感器控制

STM32基础外设开发:从点灯到传感器控制 一、前言 本篇文章总结STM32F10x系列基础外设开发实例,涵盖GPIO控制、按键检测、传感器应用等。所有代码基于标准库开发,适合STM32初学者参考。 二、硬件准备 STM32F10x系列开发板LED模块有源蜂鸣器…

[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

在开发用户搜索功能时,我们通常会将用户信息存储到 Elasticsearch(简称 ES) 中,以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步,以及如何通过 MQ 消息队列实现用户信息实时更新…

MyBatis缓存机制全解析

在MyBatis中,缓存分为一级缓存和二级缓存,它们的主要目的是减少数据库的访问次数,提高查询效率。下面简述这两种缓存的工作原理: 一、 一级缓存(SqlSession级别的缓存) 一级缓存是MyBatis默认开启的缓存机…

【短距离通信】【WiFi】WiFi7关键技术之4096-QAM、MRU

目录 3. 4096-QAM 3.1 4096-QAM 3.2 QAM 的阶数越高越好吗? 4. MRU 4.1 OFDMA 和 RU 4.2 MRU 资源分配 3. 4096-QAM 摘要 本章主要介绍了Wi-Fi 7引入的4096-QAM对数据传输速率的提升。 3.1 4096-QAM 对速率的提升 Wi-Fi 标准一直致力于提升数据传输速率&a…

【二刷力扣】【力扣热题100】今天的题目是:283.移动零

题目: 给定一个数组 nums,编写一个函数将所有 0 移动到数组的末尾,同时保持非零元素的相对顺序。 请注意 ,必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输…

机器学习中的多GPU训练模式

文章目录 一、数据并行(Data Parallelism)二、模型并行(Model Parallelism)1. 模型并行2. 张量并行(Tensor Parallelism) 三、流水线并行(Pipeline Parallelism)四、混合并行&#x…

《JavaScript 性能优化:从原理到实战的全面指南》

《JavaScript 性能优化:从原理到实战的全面指南》 一、JavaScript 性能优化基础理论 在深入探讨 JavaScript 性能优化技术之前,我们需要明白JavaScript 的执行机制和性能瓶颈产生的根本原因。JavaScript 是一种单线程、非阻塞的脚本语言,其…

选择合适的Azure数据库监控工具

Azure云为组织提供了众多服务,使其能够无缝运行应用程序、Web服务和服务器部署,其中包括云端数据库部署。Azure数据库能够与云应用程序实现无缝集成,具备可靠、易扩展和易管理的特性,不仅能提升数据库可用性与性能,同时…

9.4在 VS Code 中配置 Maven

在 VS Code 中配置 Maven 需要完成 Maven 环境安装 一、安装 Maven(如果未安装) 下载 Maven 访问 Apache Maven 官网,下载最新版本的 Maven(如apache-maven-3.9.9-bin.zip)。 解压文件 将下载的 ZIP 文件解压到本地目…

影刀自动化流程复用技巧:流程复用

草莓时刻会创建一个新的空白流程。但是很多时候需要复用过往基础流程,在此基础上进行修改即可。而而不是重新创建基础流程。 为了解决这个问题,我们需要了解一下影刀流程的基础结构。 影刀流程基础结构概览 影刀自动化流程的基础结构主要包括几个关键组…

理论篇六:如何在Webpack中实现持久化缓存?

在 Webpack 中实现持久化缓存可以显著提升构建速度,尤其是在大型项目中。以下是 7 种核心策略 及其详细配置方法: 一、文件哈希命名(Content Hash) 确保文件内容变化时哈希值才改变,利用浏览器缓存。 // webpack.config.js output: {filename: [name].[contenthash:8].j…

C++单例模式与线程安全

C单例模式的线程安全实践与优化-CSDN博客 https://www.zhihu.com/question/56527586/answer/2344903391 C11中的单例模式 在C11及更高版本中,可以使用std::call_once和std::once_flag来确保单例实例的线程安全初始化。这种方法不需要显式地使用互斥锁&#xff0c…

UE5 图片导入,拖到UI上变色

UE5会自动把蓝色的图片当成法线贴图处理,非常傻逼 双击出问题的图片,右侧面板将压缩设置从法线改回默认