java简单手写版本实现时间轮算法

网友投稿 307 2023-01-23

java简单手写版本实现时间轮算法

时间轮

关于时间轮的介绍,网上有很多,这里就不重复了

核心思想

一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度

超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)

每个槽对应一个环形链表存储该时间应该被执行的任务

需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

/**

* @author apdoer

* @version 1.0

* @date 2021/3/22 19:31

*/

@Slf4j

public class TimeWheel {

/**

* 一个槽的时间间隔(时间轮最小刻度)

*/

private long tickMs;

/**

* 时间轮大小(槽的个数)

*/

private int wheelSize;

/**

* 一轮的时间跨度

*/

private long interval;

private long currentTime;

/**

* 槽

*/

private TimerTaskList[] buckets;

/**

* 上层时间轮

*/

private volatile TimeWheel overflowWheel;

/**

* 一个timer只有一个delayqueue

*/

private DelayQueue delayQueue;

public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) {

this.currentTime = currentTime;

this.tickMs = tickMs;

this.wheelSize = wheelSize;

this.interval = tickMs * wheelSize;

this.buckets = new TimerTaskList[wheelSize];

this.currentTime = currentTime - (currentTime % tickMs);

this.delayQueue = delayQueue;

for (int i = 0; i < wheelSize; i++) {

buckets[i] = new TimerTaskList();

}

}

public boolean add(TimerTaskEntry entry) {

long expiration = entry.getExpireMs();

if (expiration < tickMs + currentTime) {

//到期了

return false;

} else if (expiration < currentTime + interval) {

//扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去

long virtualId = (expiration / tickMs);

int index = (int) (virtualId % wheelSize);

TimerTaskList bucket = buckets[index];

bucket.addTask(entry);

//设置bucket 过期时间

if (bucket.setExpiration(virtualId * tickMs)) {

//设好过期时间的bucket需要入队

delayQueue.offer(bucket);

return true;

}

} else {

//当前轮不能满足,需要扔到上一轮

TimeWheel timeWheel = getOverflowWheel();

return timeWheel.add(entry);

}

return false;

}

private TimeWheel getOverflowWheel() {

if (overflowWheel == null) {

synchronized (this) {

if (overflowWheel == null) {

overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);

}

}

}

return overflowWheel;

}

/**

* 推进指针

*

* @param timestamp

*/

public void advanceLock(long timestamp) {

if (timestamp > currentTime + tickMs) {

currentTime = timestamp - (timestamp % tickMs);

if (overflowWheel != null) {

this.getOverflowWheel().advanceLock(timestamp);

}

}

}

}

定时器接口

/**

* 定时器

* @author apdoer

* @version 1.0

* @date 2021/3/22 20:30

*/

public interface Timer {

/**

* 添加一个新任务

*

* @param timerTask

*/

void add(TimerTask timerTask);

/**

* 推动指针

*

* @param timeout

*/

void advanceClock(long timeout);

/**

* 等待执行的任务

*

* @return

*/

int size();

/**

* 关闭服务,剩下的无法被执行

*/

void shutdown();

}

定时器实现

/**

* @author apdoer

* @version 1.0

* @date 2021/3/22 20:33

*/

@Slf4j

public class SystemTimer implements Timer {

/**

* 底层时间轮

*/

private TimeWheel timeWheel;

/**

* 一个Timer只有一个延时队列

*/

private DelayQueue delayQueue = new DelayQueue<>();

/**

* 过期任务执行线程

*/

private ExecutorService workerThreadPool;

/**

* 轮询delayQueue获取过期任务线程

*/

private ExecutorService bossThreadPool;

public SystemTimer() {

this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);

this.workerThreadPool = Executors.newFixedThreadPool(100);

this.bossThreadPool = Executors.newFixedThreadPool(1);

//20ms推动一次时间轮运转

this.bossThreadPool.submit(() -> {

for (; ; ) {

this.advanceClock(20);

}

});

}

public void addTimerTaskEntry(TimerTaskEntry entry) {

if (!timeWheel.add(entry)) {

//已经过期了

TimerTask timerTask = entry.getTimerTask();

log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());

workerThreadPool.submit(timerTask);

}

}

@Override

public void add(TimerTask timerTask) {

log.info("=======添加任务开始====task:{}", timerTask.getDesc());

TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());

timerTask.setTimerTaskEntry(entry);

addTimerTaskEntry(entry);

}

/**

* 推动指针运转获取过期任务

*

* @param timeout 时间间隔

* @return

*/

@Override

public synchronized void advanceClock(long timeout) {

try {

TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);

if (bucket != null) {

//推进时间

timeWheel.advanceLock(bucket.getExpiration());

//执行过期任务(包含降级)

bucket.clear(this::addTimerTaskEntry);

}

} catch (InterruptedException e) {

log.error("advanceClock error");

}

}

@Override

public int size() {

//todo

return 0;

}

@Override

public void shutdown() {

this.bossThreadPool.shutdown();

this.workerThreadPool.shutdown();

this.timeWheel = null;

}

}

存储任务的环形链表

/**

* @author apdoer

* @version 1.0

* @date 2021/3/22 19:26

*/

@Data

@Slf4j

class TimerTaskList implements Delayed {

/**

* TimerTaskList 环形链表使用一个虚拟根节点root

*/

private TimerTaskEntry root = new TimerTaskEntry(null, -1);

{

root.next = root;

root.prev = root;

}

/**

* bucket的过期时间

*/

private AtomicLong expiration = new AtomicLong(-1L);

public long getExpiration() {

return expiration.get();

}

/**

* 设置bucket的过期时间,设置成功返回true

*

* @param expirationMs

* @return

*/

boolean setExpiration(long expirationMs) {

return expiration.getAndSet(expirationMs) != expirationMs;

}

public boolean addTask(TimerTaskEntry entry) {

boolean done = false;

while (!done) {

//如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止

entry.remove();

synchronized (this) {

if (entry.timedTaskList == null) {

//加到链表的末尾

entry.timedTaskList = this;

TimerTaskEntry tail = root.prev;

entry.prev = tail;

entry.next = root;

tail.next = entry;

root.prev = entry;

done = true;

}

}

}

return true;

}

/**

* 从 TimedTaskList 移除指定的 timerTaskEntry

*

* @param entry

*/

public void remove(TimerTaskEntry entry) {

synchronized (this) {

if (entry.getTimedTaskList().equals(this)) {

entry.next.prev = entry.prev;

entry.prev.next = entry.next;

entry.next = null;

entry.prev = null;

entry.timedTaskList = null;

}

}

}

/**

* 移除所有

*/

public synchronized void clear(Consumer entry) {

TimerTaskEntry head = root.next;

while (!head.equals(root)) {

remove(head);

entry.accept(head);

head = root.next;

}

expiration.set(-1L);

}

@Override

public long getDelay(TimeUnit unit) {

return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));

}

@Override

public int compareTo(Delayed o) {

if (o instanceof TimerTaskList) {

return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());

}

return 0;

}

}

存储任务的容器entry

/**

* @author apdoer

* @version 1.0

* @date 2021/3/22 19:26

*/

@Data

class TimerTaskEntry implements Comparable {

private TimerTask timerTask;

private long expireMs;

volatile TimerTaskList timedTaskList;

TimerTaskEntry next;

TimerTaskEntry prev;

public TimerTaskEntry(TimerTask timedTask, long expireMs) {

this.timerTask = timedTask;

this.expireMs = expireMs;

this.next = null;

this.prev = null;

}

void remove() {

TimerTaskList currentList = timedTaskList;

while (currentList != null) {

currentList.remove(this);

currentList = timedTaskList;

}

}

@Override

public int compareTo(TimerTaskEntry o) {

return ((int) (this.expireMs - o.expireMs));

}

}

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

@Data

@Slf4j

class TimerTask implements Runnable {

/**

* 延时时间

*/

private long delayMs;

/**

* 任务所在的entry

*/

private TimerTaskEntry timerTaskEntry;

private String desc;

public TimerTask(String desc, long delayMs) {

this.desc = desc;

this.delayMs = delayMs;

this.timerTaskEntry = null;

}

public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {

// 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个

if (timerTaskEntry != null && timerTaskEntry != entry) {

timerTaskEntry.remove();

}

timerTaskEntry = entry;

}

public TimerTaskEntry getTimerTaskEntry() {

return timerTaskEntry;

}

@Override

public void run() {

log.info("============={}任务执行", desc);

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:开放api接口登录模式(开放api接口登录模式是什么)
下一篇:免费api接口网址(API接口网站)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~