深入解析EventPoller:Disruptor的轮询式事件处理机制

EventPoller 是什么?

EventPoller 是 Disruptor 框架中一种 基于轮询(poll-based) 的事件消费机制。它与我们更常见的 BatchEventProcessor(基于独立的消费者线程)形成了对比。核心区别在于:

  • BatchEventProcessor (推模式): Disruptor 会为你创建一个专门的线程。一旦有事件发布,BatchEventProcessor 会在一个无限循环中自动、持续地处理事件,并推送给你的事件处理器(EventHandler)。你只需要提供处理逻辑,不用关心线程管理。
  • EventPoller (拉模式)EventPoller 不会自己创建线程。它提供一个 poll() 方法,让你可以在任何你选择的线程中,主动地去“拉取”和处理事件。控制权完全在你手中。

这种设计使得 EventPoller 非常适合与那些生命周期不受 Disruptor 控制的现有线程进行集成。

我们来看一下 EventPoller.java 中的关键部分:

 内部组件(核心字段)

EventPoller 的实现依赖于四个核心的 final 字段,它们在构造时被注入,定义了 Poller 的行为。

// ... existing code ...
public class EventPoller<T>
{private final DataProvider<T> dataProvider;private final Sequencer sequencer;private final Sequence sequence;private final Sequence gatingSequence;
// ... existing code ...
  • dataProvider: 事件的提供者,通常就是 RingBufferEventPoller 通过它来获取指定序号(sequence)的事件对象。
  • sequencer: 序列号管理器。这是 Disruptor 的核心组件,负责协调生产者和消费者之间的进度。EventPoller 用它来查询当前可消费的事件序列范围。
  • sequenceEventPoller 自身的消费进度序列。它记录了当前 Poller 已经成功处理到的事件的 sequence。每次 poll 调用成功后,这个 sequence 会被更新。
  • gatingSequence: 门控序列。这是一个非常重要的概念。EventPoller 在消费事件前,必须确保其依赖的前置消费者已经处理完这些事件。gatingSequence 就代表了这些前置依赖的进度。EventPoller 能消费到的最大序列号,不能超过 gatingSequence 的当前值。如果没有其他消费者依赖,它通常会直接依赖生产者的游标(cursor)。

Handler<T> 接口

这是 EventPoller 的核心回调接口,你需要实现它来定义事件处理逻辑。

// ... existing code ...public interface Handler<T>{/*** Called for each event to consume it** @param event the event* @param sequence the sequence of the event* @param endOfBatch whether this event is the last in the batch* @return whether to continue consuming events. If {@code false}, the poller will not feed any more events*         to the handler until {@link EventPoller#poll(Handler)} is called again* @throws Exception any exceptions thrown by the handler will be propagated to the caller of {@code poll}*/boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception;}
// ... existing code ...
  • onEvent(T event, long sequence, boolean endOfBatch):
    • event: 当前需要处理的事件对象。
    • sequence: 事件在 Ring Buffer 中的序号。
    • endOfBatch: 标志这是否是当前 poll() 调用所能获取到的一批事件中的最后一个。
    • 返回值 (boolean): 这是关键!
      • 返回 trueEventPoller 会继续尝试处理下一个可用的事件(如果存在的话)。
      • 返回 falsepoll() 方法会立即停止处理并返回,即使后面还有可用的事件。

 在 PullWithPoller.java 示例中,handler 总是返回 false,实现了每次 poll 只处理一个事件的效果。

// ... existing code ...private static Object getNextValue(final EventPoller<DataEvent<Object>> poller) throws Exception{final Object[] out = new Object[1];poller.poll((event, sequence, endOfBatch) ->{out[0] = event.copyOfData();// Return false so that only one event is processed at a time.return false;});return out[0];}
// ... existing code ...

poll(Handler<T> eventHandler) 方法

这是 EventPoller 的“引擎”,你会在你的线程循环中反复调用它。

// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){
// ... existing code ...try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}
// ... existing code ...return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...

它返回一个 PollState 枚举,告诉你轮询的结果:

  • PollState.PROCESSING: 本次 poll() 调用成功处理了一个或多个事件。
  • PollState.GATING: 有新的事件已经被发布了,但是被前置的消费者(gatingSequence)阻塞了。你需要等待前置消费者处理完,才能继续。
  • PollState.IDLE: Ring Buffer 中没有任何新的、可供处理的事件。

PollState 枚举

poll 方法的返回值,用于告诉调用者当前轮询的结果。

// ... existing code ...public enum PollState{/*** The poller processed one or more events*/PROCESSING,/*** The poller is waiting for gated sequences to advance before events become available*/GATING,/*** No events need to be processed*/IDLE}
// ... existing code ...
  • PROCESSING: 本次 poll 成功处理了至少一个事件。
  • GATING: 当前没有可处理的事件,因为被 gatingSequence 阻塞了。意思是生产者已经发布了新的事件,但是前置依赖的消费者还没跟上。调用者看到这个状态,通常会选择 Thread.yield() 或短暂休眠,等待依赖方前进。
  • IDLE: 当前没有可处理的_任何_事件。意思是连生产者都还没有发布新的事件。调用者看到这个状态,可以认为工作队列是空的。

EventPoller 的用法

想象一个场景:你正在开发一个游戏,你有一个主游戏循环(Game Loop)线程。你希望在这个主循环中处理来自网络模块的事件(例如玩家移动、聊天消息等),而这些事件是通过 Disruptor 传递的。你不能为了处理事件而阻塞游戏循环,也不想再创建一个新线程。这时 EventPoller 就是完美的解决方案。

步骤 1: 创建 EventPoller

通常,我们不直接调用 new EventPoller(...),而是使用 RingBuffer 的工厂方法 newPoller()

// 假设你已经设置好了 Disruptor 和 RingBuffer
RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();// 如果你的 Poller 消费需要依赖其他消费者,你需要提供它们的 Sequence
// 如果没有依赖,可以不传参数
// 例如,依赖 consumer1 和 consumer2
Sequence[] gatingSequences = new Sequence[]{consumer1.getSequence(), consumer2.getSequence()};
EventPoller<MyEvent> poller = ringBuffer.newPoller(gatingSequences);// 重要: 创建 Poller 后,需要将它的 Sequence 添加回 RingBuffer 的 Gating 列表中
// 这样,生产者(Publisher)才会等待你的 Poller,避免覆盖还未处理的事件
ringBuffer.addGatingSequences(poller.getSequence());
步骤 2: 实现 Handler

定义你的事件处理逻辑。

public class MyGameEventHandler implements EventPoller.Handler<MyEvent> {@Overridepublic boolean onEvent(MyEvent event, long sequence, boolean endOfBatch) {// 在这里处理游戏事件,例如更新玩家位置System.out.printf("处理事件: %s, 序号: %d, 是否是批次末尾: %b%n",event.toString(), sequence, endOfBatch);// 通常返回 true,让 poll() 处理完所有可用事件return true;}
}
步骤 3: 在你的线程中轮询

在你的主线程(例如游戏循环)中,调用 poll()

// 在你的游戏循环线程中
MyGameEventHandler handler = new MyGameEventHandler();
boolean running = true;while (running) {// --- 游戏逻辑的其他部分:渲染、物理计算等 ---updateGamePhysics();renderGraphics();// --- 从 Disruptor 中拉取并处理事件 ---try {// 调用 poll(),它会处理当前所有可用的事件,然后立即返回EventPoller.PollState state = poller.poll(handler);// 如果没有事件,可以做一些其他工作或短暂休眠,避免CPU空转if (state == EventPoller.PollState.IDLE || state == EventPoller.PollState.GATING) {// yield or sleepThread.yield();}} catch (Exception e) {// 处理异常e.printStackTrace();}
}

 

    poll详解

    这是 EventPoller 最核心的方法,我们来逐行分析它的逻辑。

    // ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){boolean processNextEvent;long processedSequence = currentSequence;try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}finally{sequence.set(processedSequence);}return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
    // ... existing code ...
    
    1. 获取序列

      • currentSequence = sequence.get(): 获取当前 Poller 的消费进度。
      • nextSequence = currentSequence + 1: 确定我们想要消费的下一个事件的序列号。
      • availableSequence = sequencer.getHighestPublishedSequence(...): 这是关键一步。它向 sequencer 查询,在 nextSequence 和 gatingSequence.get() 之间,生产者已经发布的最大可用序列号是多少。这个返回值 availableSequence 就是本次 poll 调用可以处理的事件序列号的上限。
    2. 处理可用事件 (if 分支)

      • if (nextSequence <= availableSequence): 如果为 true,说明至少有一个事件是可用的。
      • do-while 循环:
        • 循环处理从 nextSequence 到 availableSequence 的所有事件。
        • dataProvider.get(nextSequence): 从 RingBuffer 中获取事件。
        • eventHandler.onEvent(...): 调用用户提供的 Handler 来处理事件。
        • processNextEvent = ...Handler 的返回值决定是否继续循环。
        • processedSequence = nextSequence: 在事件被成功传递给 handler 后,更新 processedSequence 变量。
      • finally { sequence.set(processedSequence); }至关重要。无论 do-while 循环是正常结束还是因为 handler 抛出异常而中断,finally 块都会执行。它将 Poller 自身的 sequence 更新为最后一个已成功处理的事件的序列号。这保证了消费进度的正确性和持久化,下次调用 poll 时能从正确的位置开始。
      • return PollState.PROCESSING: 返回“处理中”状态。
    3. 等待 (else if 和 else 分支)

      • else if (sequencer.getCursor() >= nextSequence): 如果没有可用事件(即 nextSequence > availableSequence),但生产者的游标 sequencer.getCursor() 已经超过了我们想消费的 nextSequence,这说明我们是被 gatingSequence 卡住了。返回 GATING
      • else: 如果连生产者的游标都还没到 nextSequence,说明根本没有新事件。返回 IDLE

     

    总结

      EventPoller 是 Disruptor 提供的一个强大而灵活的工具。它牺牲了 BatchEventProcessor 的易用性和自动化的线程管理,换来了对消费流程的完全控制。

      适用场景

      • 当你的消费逻辑需要在某个现有线程(例如,游戏主循环、网络IO线程)中执行时。
      • 当你需要实现比 Disruptor 内置等待策略更复杂的消费调度逻辑时。
      • 当你需要以非阻塞的方式检查是否有新事件,并根据结果执行不同逻辑分支时。

      注意事项

      • 你需要自己管理轮询循环。
      • 你需要自己处理当没有事件时(IDLE 或 GATING 状态)的等待策略,以防止 CPU 100% 忙等。可以使用 Thread.yield()Thread.sleep() 或更高级的等待策略。
      • 别忘了将 poller.getSequence() 添加回 ringBuffer 的 gatingSequences 中。

      使用模式: 典型的使用方式是在你自己的循环中调用 poll(),并根据返回的 PollState 决定下一步行动:

      // 伪代码
      while (isRunning) {PollState state = poller.poll(myHandler);switch (state) {case IDLE:case GATING:// 没有事件或被阻塞,可以出让CPU或做点别的事Thread.yield(); break;case PROCESSING:// 事件被处理了,可能还有更多,可以立即再次尝试break;}
      }
      

      本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
      如若转载,请注明出处:http://www.pswp.cn/pingmian/93718.shtml
      繁体地址,请注明出处:http://hk.pswp.cn/pingmian/93718.shtml

      如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

      相关文章

      K8S-Secret资源对象

      目录 一、Secret概述 二、Secret 类型 kubectl 创建类型 三、Secret 使用 Opaque 类型 Secret 的使用 创建 yaml 一、Secret概述 k8s secrets用于存储和管理一些敏感数据&#xff0c;比如密码&#xff0c;token&#xff0c;密钥等敏感信息。它把 Pod 想要访问的加密数据…

      lua入门以及在Redis中的应用

      1.基本语法1.1变量lua的变量有&#xff1a;无效值nil&#xff0c;布尔值boolean&#xff0c;数字number、字符串string、函数function、自定义类型userdata、线程thread、表table&#xff08;key-value结构&#xff09;1.2循环数值循环for i起始值, 结束值 ,间隔值 do---option…

      淘宝电商大数据采集【采集内容||采集方法|工具||合规性||应用】

      淘宝电商大数据采集是指通过技术手段、工具或平台&#xff0c;系统性收集淘宝&#xff08;及旗下天猫等&#xff09;生态内的各类数据&#xff0c;用于分析市场趋势、用户行为、商品表现、竞品动态等&#xff0c;为电商运营、决策提供数据支持。以下从采集内容、工具方法、合规…

      ROS2核心模块

      1.创建工作空间先创建工作空间ws01_plumbing&#xff0c;终端下进入工作空间的src目录&#xff0c;执行如下命令&#xff1a;ros2 pkg create --build-type ament_cmake base_interfaces_demo2.话题通信话题通信是ROS中使用频率最高的一种通信模式&#xff0c;话题通信是基于发…

      Mac 上安装并使用 frpc(FRP 内网穿透客户端)指南

      一、先装好 Homebrew&#xff08;macOS 的包管理器&#xff09;打开终端&#xff08;Terminal&#xff09;&#xff0c;先装命令行开发工具 xcode-select --install弹窗点“安装”&#xff0c;等待 3~5 分钟。一键安装 Homebrew /bin/bash -c "$(curl -fsSL https://raw.g…

      04_接口与包管理

      第4课:接口与包管理 课程目标 深入理解Go语言接口的概念和用法 掌握接口的组合和空接口 学会使用Go Modules进行包管理 理解包的导入和导出规则 1. 接口基础 1.1 接口定义 // 基本接口定义 type Shape interface {Area() float64Perimeter()

      福昕PDF编辑软件高级版下载与详细图文安装教程!!

      软件下载 【软件名称】&#xff1a; 福昕PDF编辑器高级版 【软件大小】&#xff1a;668.9MBa a【系统要求】&#xff1a;awin10/win11或更高 福昕&#xff0c;软件下载&#xff08;夸克网盘需手机打开&#xff09;&#xff1a;&#xff1a;福昕丨夸克网盘-资源免费下载 软件介…

      利用无事务方式插入数据库解决并发插入问题(最小主键id思路)

      一、背景 由于某业务需要回退某产品数据缓存列表Asset资源&#xff0c;主任务执行后&#xff0c;通过并行执行批量子任务进行数据回退&#xff0c;子任务中会记录缓存列表Asset和缓存列表行AssetLine数据&#xff0c;并行执行过程会出现缓存列表行AssetLine重复插入问题&#…

      如何制作免费的比特币冷钱包

      本文主要从技术上讨论冷钱包的操作机制和原理&#xff0c;并不作为投资建议。对于国外的比特币玩家&#xff0c;或者打算长期囤币来对抗通货膨胀的&#xff0c;或者是想短期持有的&#xff0c;那么将比特币存储在哪里是一个Common的问题。一般是两类选择。第一种选择是存储在交…

      新手向:Python制作简易音乐播放器

      使用Python构建简易音乐播放器音乐播放器是现代数字生活中不可或缺的工具&#xff0c;从智能手机到电脑系统&#xff0c;几乎每个设备都内置了音乐播放功能。对于Python初学者来说&#xff0c;开发一个简易的音乐播放器是一个很好的实践项目&#xff0c;既能学习编程基础&#…

      【StarRocks】TabletChecker逻辑

      TabletChecker是StarRocks FE里的一个组件,它的主要工作是检查出所有的处于不健康状态的tablets。 注意,它的职责就是check(检查)。 至于tablet修复、均衡等调度工作不是TabletChecker的职责。 相关配置项 // 20秒执行一次check,代码里是执行runAfterCatalogReady()publi…

      低空经济展 | 优翼仿真携eVTOL全动飞行模拟器亮相2025深圳eVTOL展

      2025深圳eVTOL展将于2025年9月23-25日在深圳坪山燕子湖国际会展中心举行。展会以“低空经济・eVTOL・航空应急救援・商载大型无人运输机”为主题&#xff0c;以 “2天大会3天展览项目考察飞行表演颁奖盛典项目路演”的多元模式&#xff0c;打造覆盖 eVTOL全产业链的专业化合作平…

      AI驱动商业革新:开源大模型与零售精准营销引领产业升级

      在当今数字化浪潮中&#xff0c;AI 正以迅猛之势渗透至商业的每一处脉络&#xff0c;掀起一场影响深远的变革风暴&#xff0c;从根本上改写着商业运转的底层逻辑&#xff0c;创造出无数崭新的商业契机。基础模型领域&#xff0c;新的突破正在重塑行业格局。Meta 旗下的 LLaMA 3…

      【表的操作】

      文章目录 一、查看所有表 1、语法 二、创建表 1、语法 2、⽰例 3、表在磁盘上对应的⽂件 4、创建数据加时使⽤校验语句[if not exists] 三、查看表结构 1、语法 2、⽰例 四、修改表 1、语法 2、⽰例 (1)向表中添加⼀列 (2)修改某列的⻓度 (3)重命名某列 (4)删除某个字段…

      【Java后端】Spring Boot 全局异常处理最佳实践

      Spring Boot 全局异常处理最佳实践 在日常开发中&#xff0c;异常处理几乎是绕不过去的一个话题。尤其在 后端 API 项目 中&#xff0c;如果没有统一的异常处理机制&#xff0c;很容易出现以下问题&#xff1a; Controller 层代码里充斥着 try-catch&#xff0c;显得冗余。前端…

      K8S-Configmap资源

      目录 一、核心概念​ ​定义​ ​核心价值​ ​与Secret的区别​ ​二、核心特性​ ​数据存储​ ​生命周期​ ​作用域​ 什么是 Configmap&#xff1f; Configmap 能解决哪些问题&#xff1f; ConfigMap 的主要作用 三、命令行直接创建 四、通过文件创建&#xf…

      MySQL InnoDB事务acid特性的原理和隔离级别的实现原理

      InnoDB存储引擎 InnoDB存储结构表空间 则每张表都会有一个表空间&#xff08;xxx.ibd&#xff09;&#xff0c;一个mysql实例可以对应多个表空间 系统表空间 存储数据字典&#xff08;表结构定义、索引信息等&#xff09;、Change Buffer、Doublewrite Bufferundo log&#xff…

      Linux系统之部署nullboard任务管理工具

      Linux系统之部署nullboard任务管理工具一、nullboard介绍1.1 nullboard简介1.2 任务看板工具介绍1.3 nullboard使用场景二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、安装httpd软件3.1 检查yum仓库3.2 安装httpd软件3.3 启动httpd服务3.4 查看httpd服务状态3.5 防火墙…

      Qt设置软件使用期限【新版防修改系统时间】

      在工业软件或其他领域中&#xff0c;经常会对软件进行授权&#xff0c;软件需要付费进行有期限的使用。以下是我用Qt设计的设置软件使用期限的两种方案。 主体思想&#xff1a; 1.软件需要绑定机器&#xff0c;让用户无法通过复制在另一台机器上运行。 2.由厂家提供激活码供用户…

      【JavaEE】多线程(线程安全问题)

      有些代码在单个线程环境下执行正确&#xff0c;如果同样的代码在多个线程下同时执行可能就会出现问题&#xff0c;这个就是线程安全问题&#xff08;或者称线程不安全问题&#xff09;&#xff0c;简而言之就是&#xff1a;线程安全问题是由于多线程出现的问题&#xff0c;原因…