linux怎么查看本机内存大小
269
2023-01-18
Java多线程之Disruptor入门
一、Disruptor简介
Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
二、浅聊Disruptor的核心
Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。
三、Disruptor使用
3.1 pom.xml
3.2 事件Event
Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
3.3 EventFactory
实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory
public LongEvent newInstance() {
return new LongEvent();
}
}
3.4 EventHandler
实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event + ", sequence: " + sequence);
}
}
3.5 使用Disruptor原始API发布消息
import cn.flying.space.disruptor.demo.Longhttp://Event;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
/**
* 定义一个生产者,往Disruptor中投递消息
http://*/
public class LongEventProducer {
private RingBuffer
public LongEventProducer(RingBuffer
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 定位到下一个可存放的位置
long sequence = ringBuffer.next();
try {
// 拿到该位置的event
LongEvent event = ringBuffer.get(sequence);
// 设置event的值
event.set(byteBuffer.getLong(0));
} finally {
// 发布
ringBuffer.publish(sequence);
}
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
public static void main(String[] args) throws InterruptedException {
// 定义event工厂
LongEventFactory factory = new LongEventFactory();
// ringBuffer长度
int bufferSize = 1024;
// 构造一个Disruptor
Disruptor
// 绑定handler
disruptor.handleEventsWith(new LongEventHandler());
// 启动Disruptor
disruptor.start();
RingBuffer
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0; true; i++) {
byteBuffer.clear();
byteBuffer.putLong(i);
// 投递消息
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
3.6 使用Translators发布消息
import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducerUsingTranslator {
private RingBuffer
public LongEventProducerUsingTranslator(RingBuffer
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg
@Override
public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
longEvent.set(byteBuffer.getLong(0));
}
};
public void onData(ByteBuffer byteBuffer) {
ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
/**
* @author ZhangSheng
* @date 2021-4-26 14:23
*/
public class TestMain {
public static void main(String[] args) throws InterruptedException {
LongEventFactory factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor
disruptor.handleEventsWith(new LongEventHandler());
disruptor.start();
RingBuffer
LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0L; true; i++) {
byteBuffer.putLong(0, i);
// 发布
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~