Java多线程之Disruptor入门

网友投稿 269 2023-01-18

Java多线程之Disruptor入门

一、Disruptor简介

Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

二、浅聊Disruptor的核心

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

三、Disruptor使用

3.1 pom.xml

com.lmax

disruptor

3.4.3

3.2 事件Event

Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。

public class LongEvent {

private long value;

public void set(long value) {

this.value = value;

}

@Override

public String toString() {

return "LongEvent{" +

"value=" + value +

'}';

}

}

3.3 EventFactory

实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory {

public LongEvent newInstance() {

return new LongEvent();

}

}

3.4 EventHandler

实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler {

public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {

System.out.println("Event: " + event + ", sequence: " + sequence);

}

}

3.5 使用Disruptor原始API发布消息

import cn.flying.space.disruptor.demo.Longhttp://Event;

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**

* 定义一个生产者,往Disruptor中投递消息

http://*/

public class LongEventProducer {

private RingBuffer ringBuffer;

public LongEventProducer(RingBuffer ringBuffer) {

this.ringBuffer = ringBuffer;

}

public void onData(ByteBuffer byteBuffer) {

// 定位到下一个可存放的位置

long sequence = ringBuffer.next();

try {

// 拿到该位置的event

LongEvent event = ringBuffer.get(sequence);

// 设置event的值

event.set(byteBuffer.getLong(0));

} finally {

// 发布

ringBuffer.publish(sequence);

}

}

}

import cn.flying.space.disruptor.demo.LongEvent;

import cn.flying.space.disruptor.demo.LongEventFactory;

import cn.flying.space.disruptor.demo.LongEventHandler;

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;

import java.util.concurrent.Executors;

public class TestMain {

public static void main(String[] args) throws InterruptedException {

// 定义event工厂

LongEventFactory factory = new LongEventFactory();

// ringBuffer长度

int bufferSize = 1024;

// 构造一个Disruptor

Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());

// 绑定handler

disruptor.handleEventsWith(new LongEventHandler());

// 启动Disruptor

disruptor.start();

RingBuffer ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer byteBuffer = ByteBuffer.allocate(8);

for (long i = 0; true; i++) {

byteBuffer.clear();

byteBuffer.putLong(i);

// 投递消息

producer.onData(byteBuffer);

Thread.sleep(1000);

}

}

}

3.6 使用Translators发布消息

import cn.flying.space.disruptor.demo.LongEvent;

import com.lmax.disruptor.EventTranslatorOneArg;

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducerUsingTranslator {

private RingBuffer ringBuffer;

public LongEventProducerUsingTranslator(RingBuffer ringBuffer) {

this.ringBuffer = ringBuffer;

}

private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg() {

@Override

public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {

longEvent.set(byteBuffer.getLong(0));

}

};

public void onData(ByteBuffer byteBuffer) {

ringBuffer.publishEvent(TRANSLATOR, byteBuffer);

}

}

import cn.flying.space.disruptor.demo.LongEvent;

import cn.flying.space.disruptor.demo.LongEventFactory;

import cn.flying.space.disruptor.demo.LongEventHandler;

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.dsl.Disruptor;

import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

/**

* @author ZhangSheng

* @date 2021-4-26 14:23

*/

public class TestMain {

public static void main(String[] args) throws InterruptedException {

LongEventFactory factory = new LongEventFactory();

int bufferSize = 1024;

Disruptor disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

disruptor.handleEventsWith(new LongEventHandler());

disruptor.start();

RingBuffer ringBuffer = disruptor.getRingBuffer();

LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);

ByteBuffer byteBuffer = ByteBuffer.allocate(8);

for (long i = 0L; true; i++) {

byteBuffer.putLong(0, i);

// 发布

producer.onData(byteBuffer);

Thread.sleep(1000);

}

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA 自带的数据库工具真的很牛逼(收藏版)
下一篇:豆瓣开放api接口(豆瓣API)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~