Disruptor—1.原理和使用简介

大纲

1.Disruptor简介

2.Disruptor和BlockingQueue的压测对比

3.Disruptor的编程模型

4.Disruptor的数据结构与生产消费模型

5.RingBuffer + Disruptor + Sequence相关类

6.Disruptor的WaitStrategy消费者等待策略

7.EventProcessor + EventHandler等类

8.Disruptor的运行原理图

9.复杂业务需求下的编码方案和框架

10.Disruptor的串行操作

11.Disruptor的并行操作

12.Disruptor的多边形操作

13.Disruptor的多生产者和多消费者

1.Disruptor简介

(1)Disruptor是什么

(2)Disruptor的特点

(3)Disruptor的核心

(1)Disruptor是什么

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,能够以很低的延迟产生大量的交易。LMAX是建立在JVM平台上,其核心是一个业务逻辑处理器,能够在一个线程里每秒处理6百万订单。LMAX业务逻辑处理器完全是运行在内存中,使用事件驱动方式,其核心是Disruptor。

(2)Disruptor的特点

大大简化了并发程序开发的难度,性能上比Java提供的一些并发包还好。

Disruptor是一个高性能异步处理框架,实现了观察者模式。Disruptor是无锁的、是CPU友好的。Disruptor不会清除缓存中的数据,只会覆盖缓存中的数据,不需要进行垃圾回收。Disruptor业务逻辑是纯内存操作,使用事件驱动方式。

(3)Disruptor的核心

Disruptor核心是一个RingBuffer,RingBuffer是一个数组,没有首尾指针。RingBuffer是一个首尾相接的环,用于在不同线程之间传递数据。

如果RingBuffer满了,是继续覆盖还是等待消费,由生产者和消费者决定。假设RingBuffer满了,生产者有两个选择:选择一是等待RingBuffer有空位再填充,选择二是直接覆盖。同时消费者也有两种选择:选择一是等待RingBuffer满了再消费,选择二是RingBuffer填充一个就消费一个。

RingBuffer有一个序号Sequence,这个序号指向数组中下一个可用元素。随着数据不断地填充这个数组,这个序号会一直增长,直到绕过这个环。序号指向的元素,可以通过mod计算:序号 % 长度 = 索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。

Sequence通过顺序递增的序号来进行编号,以此管理正在进行交换的数据(事件)。对数据处理的过程总是沿着需要逐个递增处理,从而实现线程安全。一个Sequence用于跟踪标识某个特定的事件处理者的处理进度。

2.Disruptor和BlockingQueue的压测对比

Disruptor的性能是ArrayBlockingQueue的3倍+,这里的测试代码都是基于单线程的单生产者单消费者模式运行的。但是Disruptor本身就支持多生产者多消费者模型,测试中使用单线程明显降低了其性能。而ArrayBlockingQueue在多生产者多消费者场景下,其性能又会比单生产者单消费者场景下更低。因此,在实际应用中,Disruptor的性能会是ArrayBlockingQueue的3倍+。

public interface Constants {int EVENT_NUM_OHM = 100000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;
}public class ArrayBlockingQueue4Test {public static void main(String[] args) {//初始化一个大小为100000000的有界队列ArrayBlockingQueue,为了避免在测试时由于扩容影响性能,所以一开始就初始化大小为1亿final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);//开始时间final long startTime = System.currentTimeMillis();//向容器中添加元素new Thread(new Runnable() {public void run() {long i = 0;//首先把数据投递到有界队列ArrayBlockingQueue,单线程的生产者while (i < Constants.EVENT_NUM_OHM) {Data data = new Data(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}}).start();//从容器中取出元素new Thread(new Runnable() {public void run() {int k = 0;//然后才开始消费有界队列中的数据,单线程的消费者while (k < Constants.EVENT_NUM_OHM) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}//结束时间long endTime = System.currentTimeMillis();//整个main函数就是单线程运行,处理1千万数据,大概耗时3.6秒System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");}}).start();}
}public class DisruptorSingle4Test {public static void main(String[] args) {int ringBufferSize = 65536;final Disruptor<Data> disruptor = new Disruptor<Data>(new EventFactory<Data>() {public Data newInstance() {return new Data();}},ringBufferSize,//设置为单线程运行Executors.newSingleThreadExecutor(),//单生产者模式ProducerType.SINGLE,//new BlockingWaitStrategy()new YieldingWaitStrategy());//创建一个消费者事件处理器DataConsumer consumer = new DataConsumer();//消费数据disruptor.handleEventsWith(consumer);disruptor.start();//单线程的消费者new Thread(new Runnable() {public void run() {RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {long seq = ringBuffer.next();Data data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);//发布一个数据被消费的事件ringBuffer.publish(seq);}}}).start();}
}public class DataConsumer implements EventHandler<Data> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}public void onEvent(Data data, long seq, boolean bool) throws Exception {i++;if (i == Constants.EVENT_NUM_OHM) {long endTime = System.currentTimeMillis();//处理1千万的数据,大概耗时1.1秒System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");//可见Disruptor的性能是ArrayBlockingQueue的3倍+}}
}

3.Disruptor的编程模型

(1)Disruptor的使用步骤

(2)Disruptor的使用演示

(1)Disruptor的使用步骤

步骤一:建立一个Event工厂类,用于创建数据(Event类实例对象)
步骤二:建立一个监听事件类(Event处理器),用于处理数据(Event类实例对象)
步骤三:创建Disruptor实例,配置一系列参数
步骤四:编写生产者组件,向Disruptor容器投递数据

(2)Disruptor的使用演示

一.引入pom依赖

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.2</version>
</dependency>

二.建立Event工厂类用于创建数据

Event工厂类创建的数据就是Event类实例对象。

public class OrderEvent {//订单的价格private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}public class OrderEventFactory implements EventFactory<OrderEvent> {//返回一个空的数据对象(OrderEvent对象实例)public OrderEvent newInstance() {return new OrderEvent();}
}

三.建立监听事件类用于处理数据

监听事件类就是Event处理器,处理的数据就是Event类实例对象

public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(1000);System.err.println("消费者: " + event.getValue());}
}

四.创建Disruptor对象实例

public class Main {public static void main(String[] args) {//参数准备OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//参数一:eventFactory,消息(Event)工厂对象//参数二:ringBufferSize,容器的长度//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler//参数四:ProducerType,单生产者还是多生产者//参数五:waitStrategy,等待策略//1.实例化Disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,//单生产者new BlockingWaitStrategy());//2.添加Event处理器,用于处理事件//也就是构建Disruptor与消费者的一个关联关系disruptor.handleEventsWith(new OrderEventHandler());//3.启动disruptordisruptor.start();...}
}

五.编写生产者组件向Disruptor容器投递数据

public class Main {public static void main(String[] args) {//参数准备OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//参数一:eventFactory,消息(Event)工厂对象//参数二:ringBufferSize,容器的长度//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler//参数四:ProducerType,单生产者还是多生产者//参数五:waitStrategy,等待策略//1.实例化Disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event处理器,用于处理事件//也就是构建Disruptor与消费者的一个关联关系disruptor.handleEventsWith(new OrderEventHandler());//3.启动disruptordisruptor.start();//4.获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投递数据producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号long sequence = ringBuffer.next();try {//2.根据这个序号, 找到具体的"OrderEvent"元素//注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"OrderEvent event = ringBuffer.get(sequence);//3.进行实际的赋值处理event.setValue(data.getLong(0));} finally {//4.提交发布操作ringBuffer.publish(sequence);}}
}

4.Disruptor的数据结构与生产消费模型

(1)Disruptor的核心与原理

(2)Disruptor的RingBuffer数据结构

(3)Disruptor的生产消费模型

(1)Disruptor的核心与原理

Disruptor的核心是RingBuffer,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素。

(2)Disruptor的RingBuffer数据结构

RingBuffer是一个首尾相接的环(数组),用于在不同上下文(线程)之间传递数据。

RingBuffer拥有一个序号,这个序号指向数组中下一个可用的元素。随着生产者不停地往RingBuffer写入元素,这个序号也会一直增长,直到这个序号绕过这个环。

要找到RingBuffer数组中当前序号指向的元素,可以通过mod操作:序号 % 数组长度 = 数组索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。

(3)Disruptor的生产消费模型

一.消费快生产慢

如果消费者从RingBuffer消费元素的速度大于生产者写入元素的速度,那么当消费者发现RingBuffer没有元素时,就要停下等待生产者写入元素。

二.生产快消费慢

如果生产者向RingBuffer写入元素的速度大于消费者消费元素的速度,那么当生产者发现RingBuffer已经满了,就要停下等待消费者消费元素。

因为RingBuffer数组的长度是有限的,生产者写入到RingBuffer的末尾时,会从RingBuffer的开始位置继续写入,这时候生产者就可能会追上消费者。

5.RingBuffer + Disruptor + Sequence相关类

(1)RingBuffer类

(2)Disruptor类

(3)Sequence类

(4)Sequencer接口

(5)SequenceBarrier类

(1)RingBuffer类

RingBuffer不仅是基于数组的缓存,也是创建Sequencer与定义WaitStrategy的入口。

(2)Disruptor类

Disruptor类可认为是一个持有RingBuffer、消费者线程池、消费者集合等引用的辅助类。

(3)Sequence类

通过顺序递增的序号来编号,管理正在进行交换的数据(事件)。对数据(事件)的处理总是沿着序号逐个递增,所以能够实现多线程下的并发安全与原子性。

一个Sequence用于跟踪标识某个特定的事件处理者的处理进度,也就是事件处理者在RingBuffer中的处理进度。每一个Producer和Consumer都有一个自己的Sequence。

Sequence可以看成是一个AtomicLong类型字段,用于标识进度。Sequence还可以防止不同Sequence之间CPU缓存的伪共享问题。

Sequence的两个作用:

作用一:用于递增标识进度

作用二:用于消除伪共享

(4)Sequencer接口

一.Sequencer包含Sequence

二.Sequencer接口有两个实现类

第一个实现类是SingleProducerSequencer

第二个实现类是MultiProducerSequencer

(5)SequenceBarrier类

作用一:用于保持对RingBuffer的生产者和消费者之间的平衡关系,比如让生产者或消费者进行等待、唤醒生产者或消费者

作用二:决定消费者是否还有可处理的事件

6.Disruptor的WaitStrategy消费者等待策略

(1)WaitStrategy接口的作用

(2)消费者等待策略的种类

(3)BlockingWaitStrategy

(4)SleepingWaitStrategy

(5)YieldingWaitStrategy

(1)WaitStrategy接口的作用

决定一个消费者将会如何等待生产者将Event投递到Disruptor。

(2)消费者等待策略的种类

BlockingWaitStrategy,通过阻塞的方式进行等待
SleepingWaitStrategy,通过休眠的方式进行等待
YieldingWaitStrategy,通过线程间的切换的方式进行等待

(3)BlockingWaitStrategy

BlockingWaitStrategy是最低效的等待策略,但是对CPU的消耗最小,并且在各种不同部署环境中能提供一致的性能表现。该策略需要使用到Java中的锁,也就是会通过ReentrantLock来阻塞消费者线程。而Disruptor本身是一个无锁并发框架,所以如果追求高性能,就不要选择这种策略。

(4)SleepingWaitStrategy

SleepingWaitStrategy是性能一般的等待策略,其性能表现和BlockingWaitStrategy差不多。但由于SleepingWaitStrategy是无锁的,所以对生产者线程的影响最小。该策略对CPU的消耗一般,通过在单个线程循环 + yield切换线程实现,所以这种策略特别适合于异步日志类似的场景。

(5)YieldingWaitStrategy

YieldingWaitStrategy的性能是最好的,适合于低延迟的系统。不过该策略对CPU的消耗最高,因为完全基于yield切换线程来实现。推荐用于要求高性能且事件处理线程数小于CPU逻辑核心数的场景中,尤其是当CPU开启了超线程特性的时候。

7.EventProcessor + EventHandler等类

(1)Event对象

(2)EventProcessor接口

(3)EventHandler接口

(4)WorkProcessor类

(1)Event对象

Disruptor中的Event指的是从生产者到消费者过程中所处理的数据对象。Disruptor中没有代码表示Event,它用泛型表示,完全由用户定义。比如创建一个RingBuffer对象时,其中的泛型就表示着这个Event对象。

(2)EventProcessor接口

EventProcessor用于处理Disruptor中的Event,拥有消费者的Sequence,它有一个实现类叫BatchEventProcessor。

由于EventProcessor接口继承自Runnable接口,所以BatchEventProcessor类会实现Runnable接口的run()方法。

其实BatchEventProcessor类是Disruptor框架中最核心的类,因为它的run()方法会不断轮询并获取数据对象,然后把数据对象(Event)交给消费者去处理,也就是即回调EventHandler接口的实现类对象的onEvent()方法。

(3)EventHandler接口

EventHandler是由用户实现的并且代表了Disruptor中的一个消费者接口,也就是消费者逻辑需要在EventHandler接口的onEvent()方法实现。

(4)WorkProcessor类

WorkProcessor类可确保每个Sequence只被一个Processor消费。注意:在单消费者模式下,使用的是EventHandler,对应于EventProcessor。在多消费者模式下,使用的是WorkHandler,对应于WorkProcessor。

8.Disruptor的运行原理图

9.复杂业务需求下的编码方案和框架

(1)方案选择

(2)框架选择

(1)方案选择

方案一:完全解耦的模式,比如一个子业务线也开一个项目,此时重复代码会比较多。

方案二:模版方法模式,如果业务快速迭代,可能也会需要经常重构底层的模版方法。

(2)框架选择

一.使用有限状态机框架

二.使用Disruptor框架

10.Disruptor的串行操作

Disruptor的串行操作,可以通过链式调用handleEventsWith()方法来实现。

如果使用RingBuffer对象来发布事件,那么需要先从RingBuffer对象中获取一个可用的序号,然后根据序号获取Event对象并对Event对象赋值,最后调用RingBuffer的publish()方法发布事件。

如果使用Disruptor对象来发布事件,那么直接调用Disruptor的publishEvent()方法发布事件即可。

此外,实际应用中不建议通过Executors来创建线程池,而应通过ThreadPoolExecutor构造函数具体指定线程池的每一个参数。因为Executors创建的线程池还是可能有安全隐患,比如Executors的newFixedThreadPool()方法使用的是无界队列,其使用的LinkedBlockingQueue是一个可选是否有界的阻塞队列。

//Disruptor中的Event
public class Trade {private String id;private String name;private double price;private AtomicInteger count = new AtomicInteger(0);public Trade() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;}
}public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数//因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列//LinkedBlockingQueue是一个可选是否有界的阻塞队列ExecutorService es1 = Executors.newFixedThreadPool(8);//构建一个线程池用于提交任务ExecutorService es2 = Executors.newFixedThreadPool(1);//1.构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//串行操作,通过链式编程实现disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());//3.启动disruptor并获取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通过线程池向Disruptor发布事件(生产数据)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("总耗时: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//实现EventHandler的onEvent()方法,可以监听生产者发布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

11.Disruptor的并行操作

Disruptor的并行操作可以有两种方式实现:方式一是调用handleEventsWith()方法时传入多个handler对象,方式二是分别多次调用handleEventsWith()方法。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数//因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列//LinkedBlockingQueue是一个可选是否有界的阻塞队列ExecutorService es1 = Executors.newFixedThreadPool(8);//构建一个线程池用于提交任务ExecutorService es2 = Executors.newFixedThreadPool(1);//1.构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//Disruptor的并行操作可以有两种方式实现//方式一:调用handleEventsWith方法时传入多个handler对象disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());//方式二:分别多次调用handleEventsWith()方法//disruptor.handleEventsWith(new Handler1());//disruptor.handleEventsWith(new Handler2());//disruptor.handleEventsWith(new Handler3());//3.启动disruptor并获取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通过线程池向Disruptor发布事件(生产数据)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("总耗时: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//实现EventHandler的onEvent()方法,可以监听生产者发布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

12.Disruptor的多边形操作

(1)Disruptor的菱形操作

(2)Disruptor的六边形操作

Disruptor可以实现串并行同时编码。

(1)Disruptor的菱形操作

可以理解为先并行执行,然后再串行执行,类似于CyclicBarrier。

菱形操作方式一:调用handleEventsWith()方法时传入多个参数 + 链式调用。

菱形操作方式二:调用handleEventsWith()方法时传入多个参数 + 使用then()方法。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数//因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列//LinkedBlockingQueue是一个可选是否有界的阻塞队列ExecutorService es1 = Executors.newFixedThreadPool(8);//构建一个线程池用于提交任务ExecutorService es2 = Executors.newFixedThreadPool(1);//1.构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//菱形操作一disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());//菱形操作二//EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());//ehGroup.then(new Handler3());//3.启动disruptor并获取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通过线程池向Disruptor发布事件(生产数据)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("总耗时: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//实现EventHandler的onEvent()方法,可以监听生产者发布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

(2)Disruptor的六边形操作

通过Disruptor的after()方法 + 菱形操作,可实现六边形操作。

注意在单消费者模式下:一个EventHandler会对应一个BatchEventProcessor,所以如果有n个EventHandler监听Disruptor,那么初始化Disruptor时的线程池就要有n个线程,否则可能导致多边形操作失效。

在单消费者模式下,如果有非常多EventHandler,就需要非常多线程。此时是不合理的,所以如果有很多EventHandler,可采用多消费者模式。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数//因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列//LinkedBlockingQueue是一个可选是否有界的阻塞队列ExecutorService es1 = Executors.newFixedThreadPool(8);//构建一个线程池用于提交任务ExecutorService es2 = Executors.newFixedThreadPool(1);//1.构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//六边形操作Handler1 h1 = new Handler1();Handler2 h2 = new Handler2();Handler3 h3 = new Handler3();Handler4 h4 = new Handler4();Handler5 h5 = new Handler5();disruptor.handleEventsWith(h1, h4);disruptor.after(h1).handleEventsWith(h2);disruptor.after(h4).handleEventsWith(h5);disruptor.after(h2, h5).handleEventsWith(h3);//3.启动disruptor并获取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通过线程池向Disruptor发布事件(生产数据)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("总耗时: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//实现EventHandler的onEvent()方法,可以监听生产者发布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}public class Handler4 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 4 : SET PRICE");Thread.sleep(1000);event.setPrice(17.0);}
}public class Handler5 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 5 : GET PRICE: " + event.getPrice());Thread.sleep(1000);event.setPrice(event.getPrice() + 3.0);}
}

13.Disruptor的多生产者和多消费者

注意一:使用多消费者模式时,每个消费者都需要实现WorkHandler接口,而不是EventHandler接口。单消费者模式,使用的是EventHandler,对应于EventProcessor。多消费者模式,使用的是WorkHandler,对应于WorkProcessor。

注意二:使用多消费者模式时,需要构建消费者工作池WorkerPool。

注意三:使用多消费者模式时,每个消费者需要一个Sequence来标记当前消费的最小序号。这样生产者投递消息时才能遍历消费者的Sequence找出最小的序号,然后写到最小的序号位置进行阻塞等待。

比如下图中,在某一时刻:消费者1消费了序号0和2,但序号1还没有消费完毕。消费者2消费了序号3和4,消费者3消费了序号5。此时,在RingBuffer中,虽然序号0、2、3、4、5都可以覆盖了,但由于序号1还没被消费,所以生产者最多只能覆盖到序号0的位置。然后等待序号1被消费者1消费完毕后,才能继续往RingBuffer投递消息。

//Disruptor中的 Event
public class Order {private String id;private String name;private double price;public Order() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}
}public class Main {public static void main(String[] args) throws InterruptedException {//1.创建RingBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,//多生产者new EventFactory<Order>() {public Order newInstance() {return new Order();}},1024 * 1024,new YieldingWaitStrategy());//2.通过ringBuffer创建一个屏障SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();//3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口Consumer[] consumers = new Consumer[10];for (int i = 0; i < consumers.length; i++) {consumers[i] = new Consumer("C" + i);}//4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPoolWorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),consumers);//5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//6.启动workerPoolworkerPool.start(Executors.newFixedThreadPool(5));final CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 100; i++) {final Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {public void run() {try {latch.await();} catch (Exception e) {e.printStackTrace();}for (int j = 0; j < 100; j++) {producer.sendData(UUID.randomUUID().toString());}}}).start();}Thread.sleep(2000);System.err.println("----------等待线程创建完毕,才开始生产数据----------");latch.countDown();Thread.sleep(10000);System.err.println("任务总数:" + consumers[2].getCount());}static class EventExceptionHandler implements ExceptionHandler<Order> {public void handleEventException(Throwable ex, long sequence, Order event) {}public void handleOnStartException(Throwable ex) {}public void handleOnShutdownException(Throwable ex) {}}
}public class Consumer implements WorkHandler<Order> {private static AtomicInteger count = new AtomicInteger(0);private String consumerId;private Random random = new Random();public Consumer(String consumerId) {this.consumerId = consumerId;}public void onEvent(Order event) throws Exception {Thread.sleep(1 * random.nextInt(5));System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());count.incrementAndGet();}public int getCount() {return count.get();}
}public class Producer {private RingBuffer<Order> ringBuffer;public Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(String uuid) {long sequence = ringBuffer.next();try {Order order = ringBuffer.get(sequence);order.setId(uuid);} finally {ringBuffer.publish(sequence);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/81911.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌浏览器调试python pygui程序

谷歌浏览器地址:chrome://inspect/#devices 前端vue为8080, 配置如下 pychat 需要配置环境变量 PYTHONUNBUFFERED1;QTWEBENGINE_REMOTE_DEBUGGING9012

手写Tomcat(二)—Tomcat简化模型架构

一、 Tomcat架构 Tomcat的主要角色是 servlet容器&#xff0c;提供一个解释器&#xff0c;能够解析并执行JavaScript Object Notation (JON)脚本&#xff08;后更改为Servlet&#xff09;&#xff0c;并将请求传送到指定的服务器&#xff08;如JavaBean&#xff09;。因此&…

Android 网络全栈攻略(五)—— 从 OkHttp 拦截器来看 HTTP 协议二

上一篇我们介绍了 OkHttp 的责任链以及第一个内置拦截器 —— 重试与重定向拦截器。本篇我们将剩余四个拦截器的解析做完。 1、桥接拦截器 BridgeInterceptor 作为请求准备和实际发送之间的桥梁&#xff0c;自动处理 HTTP 请求头等繁琐工作。比如设置请求内容长度&#xff0c…

JDBC-java操作数据库

1.基本结构&#xff1a; package com.atguigu.servlets;import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement;public class JDBCemo {public static void main(String[] args) throws Exception{String url "jdbc:mysql:///mysql&qu…

七彩喜适老化改造:让每个空间成为长者尊严的守护者

随着我国老龄化进程的加速&#xff0c;居家养老逐渐成为老年人首选的生活方式。 为了让老年人能够在熟悉的环境中安享晚年&#xff0c;适老化改造应运而生。 七彩喜作为居家养老服务的创新者&#xff0c;致力于通过科学设计和人性化改造&#xff0c;为老年人提供安全、舒适、…

【动态规划】5 从一次函数出发推导斜率优化dp

背景 基于例题《任务安排》逐步推导进行斜率优化。 引入 例题&#xff1a;P2365 任务安排 考虑动态规划。使用 d p i , j dp_{i,j} dpi,j​ 表示前 i i i 个任务分了 j j j 段的最小费用。 显然&#xff0c;有 d p i , j min ⁡ k 1 i − 1 ( d p i , j , d p k , j…

MySQL中实现大数据量的快速插入

一、SQL语句优化​ 1. ​批量插入代替单条插入​ ​单条插入会频繁触发事务提交和日志写入&#xff0c;效率极低。​批量插入通过合并多条数据为一条SQL语句&#xff0c;减少网络传输和SQL解析开销。 -- 低效写法&#xff1a;逐条插入 INSERT INTO table (col1, col2) VALUE…

C++23中std::span和std::basic_string_view可平凡复制提案解析

文章目录 一、引言二、相关概念解释2.1 平凡复制&#xff08;Trivially Copyable&#xff09;2.2 std::span2.3 std::basic_string_view 三、std::span和std::basic_string_view的应用场景3.1 std::span的应用场景3.2 std::basic_string_view的应用场景 四、P2251R1提案对std::…

广东省省考备考(第十八天5.23)—言语:语句填空题(听课后强化训练)

错题 解析 横线出现在文段中间&#xff0c;需结合上下文内容进行分析。文段开篇指出逃离北上广深的话题时而出现&#xff0c;一些人离开大城市回到小城市。随后通过转折词“但”引出横线内容&#xff0c;且结合横线后人才倾向于向更发达的地方流动的内容&#xff0c;横线处应体…

持续更新 ,GPT-4o 风格提示词案例大全!附使用方式

本文汇集了各类4o风格提示词的精选案例&#xff0c;从基础指令到复杂任务&#xff0c;从创意写作到专业领域&#xff0c;为您提供全方位的参考和灵感。我们将持续更新这份案例集&#xff0c;确保您始终能够获取最新、最有效的提示词技巧。 让我们一起探索如何通过精心设计的提…

创建型:建造者模式

目录 1、核心思想 2、实现方式 2.1 模式结构 2.2 工作流程 2.3 实现案例 2.4 变体&#xff1a;链式建造者&#xff08;常见于多参数对象&#xff0c;无需指挥者&#xff09; 3、优缺点分析 4、适用场景 1、核心思想 目的&#xff1a;将复杂对象的构建过程与其表示分离…

力扣-长度最小的子数组

1.题目描述 2.题目链接 LCR 008. 长度最小的子数组 - 力扣&#xff08;LeetCode&#xff09; 3.题目分析 这道题目我们使用的也是双指针。我们可以定义两个指针都指向数组第一个元素&#xff0c;然后使用right指针遍历原数组&#xff0c;计算left指针到right指针之间的所有元…

JAVA开发工具延长方案

亲测稳定的延长方案与避坑指南 真的搞不懂了&#xff0c;说点专业的术语竟然成了 QINQUAN。那就直接点&#xff0c;把这个方案带给需要的开发者。 延长工具直通车 保姆级教程 延长方案https://mp.weixin.qq.com/s/uajM2Y9Vz6TnolzcLur_bw还是让大家看看&#xff0c;发什么会被…

SpringAI开发SSE传输协议的MCP Server

SpringAI 访问地址&#xff1a;Spring AI ‌ Spring AI‌是一个面向人工智能工程的应用框架&#xff0c;由Spring团队推出&#xff0c;旨在将AI能力集成到Java应用中。Spring AI的核心是解决AI集成的根本挑战&#xff0c;即将企业数据和API与AI模型连接起来‌。 MCP…

JAVA动态生成类

在java的加载过程一般都是要预先定义java类,然后通过经过加载->连接->初始化三步。连接过程又可分为三步:验证->准备->解析。初始化的类是不允许修改。但是在日常的工作中有时候需要动态生成类,那第这种情况怎么办呢? 可以这么处理: 1、先定义一个空的类,仅…

深入解析Java微服务架构:Spring Boot与Spring Cloud的整合实践

深入解析Java微服务架构&#xff1a;Spring Boot与Spring Cloud的整合实践 引言 随着云计算和分布式系统的快速发展&#xff0c;微服务架构已成为现代软件开发的主流模式。Java作为企业级应用开发的核心语言&#xff0c;结合Spring Boot和Spring Cloud&#xff0c;为开发者提…

03_基础篇-NumPy(下):深度学习中的常用操作

03_基础篇-NumPy&#xff08;下&#xff09;&#xff1a;深度学习中的常用操作 通过上节课的学习&#xff0c;我们已经对NumPy数组有了一定的了解&#xff0c;正所谓实践出真知&#xff0c;今天我们就以一个图像分类的项目为例&#xff0c;看看NumPy的在实际项目中都有哪些重要…

时钟识别项目报告(深度学习、计算机视觉)

深度学习方式 一、模型架构 本模型采用双任务学习框架&#xff0c;基于经典残差网络实现时钟图像的小时和分钟同步识别。 主干网络 使用预训练的ResNet18作为特征提取器&#xff0c;移除原分类层&#xff08;fc层&#xff09;&#xff0c;保留全局平均池化后的512维特征向量。…

openai-whisper-asr-webservice接入dify

openai-whisper-asr-webservice提供的asr的api其实并不兼容openai的api&#xff0c;所以在dify中是不能直接添加到语音转文字的模型中&#xff0c;对比了下两个api的传参情况&#xff0c;其实只要改动一处&#xff0c;就能支持&#xff1a; openai兼容的asr调用中formdata中音频…

解锁MySQL性能调优:高级SQL技巧实战指南

高级SQL技巧&#xff1a;解锁MySQL性能调优的终极指南 开篇 当前&#xff0c;随着业务系统的复杂化和数据量的爆炸式增长&#xff0c;数据库性能调优成为了技术人员面临的核心挑战之一。尤其是在高并发、大数据量的场景下&#xff0c;SQL 查询的性能直接影响到整个系统的响应…