Java实现Redis延时消息队列

网友投稿 295 2022-12-19

Java实现Redis延时消息队列

目录什么是延时任务延时任务的特点实现思路:代码实现1.消息模型2.RedisMq 消息队列实现类3.消息生产者4.消息消费者5. 消息执接口6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求

什么是延时任务

延时任务的特点

时间有序性

时间具体性

任务中携带详细的信息 ,通常包括 任务ID, 任务的类型 ,时间点。

实现思路:

将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body

使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)

轮询ZSET,拿出score比当前时间戳大的数据(已过期的)

根据id拿到消息池的具体消息进行消费

消费成功,删除改队列和消息

消费失败,让该消息重新回到队列

代码实现

1.消息模型

import lombok.Data;

import lombok.experimental.Accessors;

import javax.validation.constraints.NotNull;

import java.io.Serializable;

/**

* Redis 消息队列中的消息体

* @author shikanatsu

*/

@Data

@Accessors(chain = true)

public class RedisMessage implements Serializable {

/** 消息队列组 **/

private String group;

/**

* 消息id

*/

private String id;

/**

* 消息延迟/ 秒

*/

@NotNull(message = "消息延时时间不能为空")

private long delay;

/**

* 消息存活时间 单位:秒

*/

@NotNull(message = "消息存活时间不能为空")

private int ttl;

/**

* 消息体,对应业务内容

*/

private Object body;

/**

* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0

* 用来消除时间的影响

*/

private long createTime;

}

2.RedisMq 消息队列实现类

package com.shixun.base.redisMq;

import com.shixun.base.jedis.service.RedisService;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

* Redis消息队列

*

* @author shikanatsu

*/

@Component

public class RedisMq {

/**

* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link MSG_POOL}

* 的消息体body作为值存储

*/

public static final String MSG_POOL = "Message:Pool:";

/**

* zset队列 名称 queue

*/

public static final String QUEUE_NAME = "Message:Queue:";

// private static final int SEMIH = 30 * 60;

@Resource

private RedisService redisService;

/**

* 存入消息池

*

* @param message

* @return

*/

public boolean addMsgPool(RedisMessage message) {

if (null != message) {

redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());

return true;

}

return false;

}

/**

* 从消息池中删除消息

*

* @param id

* @return

*/

public void deMsgPool(String group, String id) {

redisService.remove(MSG_POOL + group + id);

}

/**

* 向队列中添加消息

*

* @param key

* @param score 优先级

* @param val

* @return 返回消息id

*/

public void enMessage(String key, long score, String val) {

redisService.zsset(key, val, score);

}

/**

* 从队列删除消息

*

* @param id

* @return

*/

public boolean deMessage(String key, String id) {

return redisService.zdel(key, id);

}

}

3.消息生产者

import cn.hutool.core.convert.Convert;

import cn.hutool.core.lang.Assert;

import cn.hutool.core.util.IdUtil;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import org.springframework.validation.annotation.Validated;

import javax.annotation.Resource;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.concurrent.TimeUnit;

/**

* 消息生产者

*

* @author shikanatsu

*/

@Component

public class MessageProvider {

static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

@Resource

private RedisMq redisMq;

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public boolean sendMessage(@Validated RedisMessage message) {

Assert.notNull(message);

//The priority is if there is no creation time

// message.setCreateTime(System.currentTimeMillis());

message.setId(IdUtil.fastUUID());

Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);

try {

redisMq.addMsgPool(message);

redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());

logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}",message.toString(),new Date(),sdf.format(delayTime));

}catch (Exception e){

e.printStackTrace();

logger.error("RedisMq 消息发送失败,当前时间:{}",new Date());

return false;

}

return true;

}

}

4.消息消费者

/**

* Redis消息消费者

* @author shikanatsu

*/

@Component

public class RedisMqConsumer {

private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);

@Resource

private RedisMq redisMq;

@Resource

private RedisService redisService;

@Resource

private MessageProvider provider;

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

//@Scheduled(cron = "*/1 * * * * ? ")

/**

Instead of a thread loop, you can use Cron expressions to perform periodic tasks

*/

public void baseMonitor(RedisMqExecute mqExecute){

String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();

//The query is currently expired

Set set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());

if (null != set) {

long current = System.currentTimeMillis();

for (Object id : set) {

long score = redisService.getScore(queueName, id.toString()).longValue();

//Once again the guarantee has expired , And then perform the consumption

if (current >= score) {

String str = "";

RedisMessage message = null;

String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();

try {

message = (RedisMessage)redisService.get(msgPool + id.toString());

log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));

if(null==message){

return;

}

//Do something ; You can add a judgment here and if it fails you can add it to the queue again

mqExecute.execute(message);

} catch (Exception e) {

e.printStackTrace();

//If an exception occurs, it is put back into the queue

// todo: If repeated, this can lead to repeated cycles

log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());

provider.sendMessage(message);

} finally {

redisMq.deMessage(queueName, id.toString());

redisMq.deMsgPool(message.getGroup(),id.toString());

}

}

}

}

}

}

5. 消息执接口

/**

* @author shikanatsu

*/

public interface RedisMqExecute {

/**

* 获取队列名称

* @return

*/

public String getQueueName();YqPDWtRr

/**

* 统一的通过执行期执行

* @param message

* @return

*/

public boolean execute(RedisMessage message);

/**

* Perform thread polling

*/

public void threadPolling();

}

6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求

/**

* 订单执行

*

* @author shikanatsu

*/

@Service

public class OrderMqExecuteImpl implements RedisMqExecute {

private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);

public final static String name = "orderPoll:";

@Resource

private RedisMqConsumer redisMqConsumer;

private RedisMqExecute mqExecute = this;

@Resource

private OrderService orderService;

@Override

public String getQueueName() {

return name;

}

@Override

/**

* For the time being, only all orders will be processed. You can change to make orders

*/

public boolean execute(RedisMessage message) {

logger.info("Do orderMqPoll ; Time:{}",new Date());

//Do

return true;

}

@Override

/** 通过线程去执行轮询的过程,时间上可以自由控制 **/

public void threadPolling() {

ThreadUtil.execute(() -> {

while (true) {

redisMqConsumer.baseMonitor(mqExecute);

ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);

}

});

}

}

使用事例

 1. 实现RedisMqExecute 接口 创建对应的轮询或者采取定时器的方式执行 和实现具体的任务。

 2.  通过MessageProvider 实现相对应的消息服务和绑定队列组,通过队列组的方式执行。

 3. 提示: 采取线程的方式需要在项目启动过程中执行,采取定时器或者调度的方式可以更加动态的调整。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java面向对象关键字extends继承的深入讲解
下一篇:springboot整合mybatis实现多表查询的实战记录
相关文章

 发表评论

暂时没有评论,来抢沙发吧~