EventPoller 是什么?
EventPoller
是 Disruptor 框架中一种 基于轮询(poll-based) 的事件消费机制。它与我们更常见的 BatchEventProcessor
(基于独立的消费者线程)形成了对比。核心区别在于:
BatchEventProcessor
(推模式): Disruptor 会为你创建一个专门的线程。一旦有事件发布,BatchEventProcessor
会在一个无限循环中自动、持续地处理事件,并推送给你的事件处理器(EventHandler
)。你只需要提供处理逻辑,不用关心线程管理。EventPoller
(拉模式):EventPoller
不会自己创建线程。它提供一个poll()
方法,让你可以在任何你选择的线程中,主动地去“拉取”和处理事件。控制权完全在你手中。
这种设计使得 EventPoller
非常适合与那些生命周期不受 Disruptor 控制的现有线程进行集成。
我们来看一下 EventPoller.java
中的关键部分:
内部组件(核心字段)
EventPoller
的实现依赖于四个核心的 final 字段,它们在构造时被注入,定义了 Poller 的行为。
// ... existing code ...
public class EventPoller<T>
{private final DataProvider<T> dataProvider;private final Sequencer sequencer;private final Sequence sequence;private final Sequence gatingSequence;
// ... existing code ...
dataProvider
: 事件的提供者,通常就是RingBuffer
。EventPoller
通过它来获取指定序号(sequence)的事件对象。sequencer
: 序列号管理器。这是 Disruptor 的核心组件,负责协调生产者和消费者之间的进度。EventPoller
用它来查询当前可消费的事件序列范围。sequence
:EventPoller
自身的消费进度序列。它记录了当前 Poller 已经成功处理到的事件的 sequence。每次poll
调用成功后,这个sequence
会被更新。gatingSequence
: 门控序列。这是一个非常重要的概念。EventPoller
在消费事件前,必须确保其依赖的前置消费者已经处理完这些事件。gatingSequence
就代表了这些前置依赖的进度。EventPoller
能消费到的最大序列号,不能超过gatingSequence
的当前值。如果没有其他消费者依赖,它通常会直接依赖生产者的游标(cursor)。
Handler<T>
接口
这是 EventPoller
的核心回调接口,你需要实现它来定义事件处理逻辑。
// ... existing code ...public interface Handler<T>{/*** Called for each event to consume it** @param event the event* @param sequence the sequence of the event* @param endOfBatch whether this event is the last in the batch* @return whether to continue consuming events. If {@code false}, the poller will not feed any more events* to the handler until {@link EventPoller#poll(Handler)} is called again* @throws Exception any exceptions thrown by the handler will be propagated to the caller of {@code poll}*/boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception;}
// ... existing code ...
onEvent(T event, long sequence, boolean endOfBatch)
:event
: 当前需要处理的事件对象。sequence
: 事件在 Ring Buffer 中的序号。endOfBatch
: 标志这是否是当前poll()
调用所能获取到的一批事件中的最后一个。- 返回值 (boolean): 这是关键!
- 返回
true
:EventPoller
会继续尝试处理下一个可用的事件(如果存在的话)。 - 返回
false
:poll()
方法会立即停止处理并返回,即使后面还有可用的事件。
- 返回
在 PullWithPoller.java
示例中,handler 总是返回 false
,实现了每次 poll
只处理一个事件的效果。
// ... existing code ...private static Object getNextValue(final EventPoller<DataEvent<Object>> poller) throws Exception{final Object[] out = new Object[1];poller.poll((event, sequence, endOfBatch) ->{out[0] = event.copyOfData();// Return false so that only one event is processed at a time.return false;});return out[0];}
// ... existing code ...
poll(Handler<T> eventHandler)
方法
这是 EventPoller
的“引擎”,你会在你的线程循环中反复调用它。
// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){
// ... existing code ...try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}
// ... existing code ...return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...
它返回一个 PollState
枚举,告诉你轮询的结果:
PollState.PROCESSING
: 本次poll()
调用成功处理了一个或多个事件。PollState.GATING
: 有新的事件已经被发布了,但是被前置的消费者(gatingSequence
)阻塞了。你需要等待前置消费者处理完,才能继续。PollState.IDLE
: Ring Buffer 中没有任何新的、可供处理的事件。
PollState
枚举
poll
方法的返回值,用于告诉调用者当前轮询的结果。
// ... existing code ...public enum PollState{/*** The poller processed one or more events*/PROCESSING,/*** The poller is waiting for gated sequences to advance before events become available*/GATING,/*** No events need to be processed*/IDLE}
// ... existing code ...
PROCESSING
: 本次poll
成功处理了至少一个事件。GATING
: 当前没有可处理的事件,因为被gatingSequence
阻塞了。意思是生产者已经发布了新的事件,但是前置依赖的消费者还没跟上。调用者看到这个状态,通常会选择Thread.yield()
或短暂休眠,等待依赖方前进。IDLE
: 当前没有可处理的_任何_事件。意思是连生产者都还没有发布新的事件。调用者看到这个状态,可以认为工作队列是空的。
EventPoller
的用法
想象一个场景:你正在开发一个游戏,你有一个主游戏循环(Game Loop)线程。你希望在这个主循环中处理来自网络模块的事件(例如玩家移动、聊天消息等),而这些事件是通过 Disruptor 传递的。你不能为了处理事件而阻塞游戏循环,也不想再创建一个新线程。这时 EventPoller
就是完美的解决方案。
步骤 1: 创建 EventPoller
通常,我们不直接调用 new EventPoller(...)
,而是使用 RingBuffer
的工厂方法 newPoller()
。
// 假设你已经设置好了 Disruptor 和 RingBuffer
RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();// 如果你的 Poller 消费需要依赖其他消费者,你需要提供它们的 Sequence
// 如果没有依赖,可以不传参数
// 例如,依赖 consumer1 和 consumer2
Sequence[] gatingSequences = new Sequence[]{consumer1.getSequence(), consumer2.getSequence()};
EventPoller<MyEvent> poller = ringBuffer.newPoller(gatingSequences);// 重要: 创建 Poller 后,需要将它的 Sequence 添加回 RingBuffer 的 Gating 列表中
// 这样,生产者(Publisher)才会等待你的 Poller,避免覆盖还未处理的事件
ringBuffer.addGatingSequences(poller.getSequence());
步骤 2: 实现 Handler
定义你的事件处理逻辑。
public class MyGameEventHandler implements EventPoller.Handler<MyEvent> {@Overridepublic boolean onEvent(MyEvent event, long sequence, boolean endOfBatch) {// 在这里处理游戏事件,例如更新玩家位置System.out.printf("处理事件: %s, 序号: %d, 是否是批次末尾: %b%n",event.toString(), sequence, endOfBatch);// 通常返回 true,让 poll() 处理完所有可用事件return true;}
}
步骤 3: 在你的线程中轮询
在你的主线程(例如游戏循环)中,调用 poll()
。
// 在你的游戏循环线程中
MyGameEventHandler handler = new MyGameEventHandler();
boolean running = true;while (running) {// --- 游戏逻辑的其他部分:渲染、物理计算等 ---updateGamePhysics();renderGraphics();// --- 从 Disruptor 中拉取并处理事件 ---try {// 调用 poll(),它会处理当前所有可用的事件,然后立即返回EventPoller.PollState state = poller.poll(handler);// 如果没有事件,可以做一些其他工作或短暂休眠,避免CPU空转if (state == EventPoller.PollState.IDLE || state == EventPoller.PollState.GATING) {// yield or sleepThread.yield();}} catch (Exception e) {// 处理异常e.printStackTrace();}
}
poll详解
这是 EventPoller
最核心的方法,我们来逐行分析它的逻辑。
// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){boolean processNextEvent;long processedSequence = currentSequence;try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}finally{sequence.set(processedSequence);}return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...
获取序列:
currentSequence = sequence.get()
: 获取当前 Poller 的消费进度。nextSequence = currentSequence + 1
: 确定我们想要消费的下一个事件的序列号。availableSequence = sequencer.getHighestPublishedSequence(...)
: 这是关键一步。它向sequencer
查询,在nextSequence
和gatingSequence.get()
之间,生产者已经发布的最大可用序列号是多少。这个返回值availableSequence
就是本次poll
调用可以处理的事件序列号的上限。
处理可用事件 (
if
分支):if (nextSequence <= availableSequence)
: 如果为true
,说明至少有一个事件是可用的。do-while
循环:- 循环处理从
nextSequence
到availableSequence
的所有事件。 dataProvider.get(nextSequence)
: 从 RingBuffer 中获取事件。eventHandler.onEvent(...)
: 调用用户提供的Handler
来处理事件。processNextEvent = ...
:Handler
的返回值决定是否继续循环。processedSequence = nextSequence
: 在事件被成功传递给handler
后,更新processedSequence
变量。
- 循环处理从
finally { sequence.set(processedSequence); }
: 至关重要。无论do-while
循环是正常结束还是因为handler
抛出异常而中断,finally
块都会执行。它将 Poller 自身的sequence
更新为最后一个已成功处理的事件的序列号。这保证了消费进度的正确性和持久化,下次调用poll
时能从正确的位置开始。return PollState.PROCESSING
: 返回“处理中”状态。
等待 (
else if
和else
分支):else if (sequencer.getCursor() >= nextSequence)
: 如果没有可用事件(即nextSequence > availableSequence
),但生产者的游标sequencer.getCursor()
已经超过了我们想消费的nextSequence
,这说明我们是被gatingSequence
卡住了。返回GATING
。else
: 如果连生产者的游标都还没到nextSequence
,说明根本没有新事件。返回IDLE
。
总结
EventPoller
是 Disruptor 提供的一个强大而灵活的工具。它牺牲了 BatchEventProcessor
的易用性和自动化的线程管理,换来了对消费流程的完全控制。
适用场景:
- 当你的消费逻辑需要在某个现有线程(例如,游戏主循环、网络IO线程)中执行时。
- 当你需要实现比 Disruptor 内置等待策略更复杂的消费调度逻辑时。
- 当你需要以非阻塞的方式检查是否有新事件,并根据结果执行不同逻辑分支时。
注意事项:
- 你需要自己管理轮询循环。
- 你需要自己处理当没有事件时(
IDLE
或GATING
状态)的等待策略,以防止 CPU 100% 忙等。可以使用Thread.yield()
、Thread.sleep()
或更高级的等待策略。 - 别忘了将
poller.getSequence()
添加回ringBuffer
的gatingSequences
中。
使用模式: 典型的使用方式是在你自己的循环中调用 poll()
,并根据返回的 PollState
决定下一步行动:
// 伪代码
while (isRunning) {PollState state = poller.poll(myHandler);switch (state) {case IDLE:case GATING:// 没有事件或被阻塞,可以出让CPU或做点别的事Thread.yield(); break;case PROCESSING:// 事件被处理了,可能还有更多,可以立即再次尝试break;}
}