c语言sscanf函数的用法是什么
295
2022-12-19
Java实现Redis延时消息队列
目录什么是延时任务延时任务的特点实现思路:代码实现1.消息模型2.RedisMq 消息队列实现类3.消息生产者4.消息消费者5. 消息执接口6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求
什么是延时任务
延时任务的特点
时间有序性
时间具体性
任务中携带详细的信息 ,通常包括 任务ID, 任务的类型 ,时间点。
实现思路:
将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
根据id拿到消息池的具体消息进行消费
消费成功,删除改队列和消息
消费失败,让该消息重新回到队列
代码实现
1.消息模型
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* Redis 消息队列中的消息体
* @author shikanatsu
*/
@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {
/** 消息队列组 **/
private String group;
/**
* 消息id
*/
private String id;
/**
* 消息延迟/ 秒
*/
@NotNull(message = "消息延时时间不能为空")
private long delay;
/**
* 消息存活时间 单位:秒
*/
@NotNull(message = "消息存活时间不能为空")
private int ttl;
/**
* 消息体,对应业务内容
*/
private Object body;
/**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* 用来消除时间的影响
*/
private long createTime;
}
2.RedisMq 消息队列实现类
package com.shixun.base.redisMq;
import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Redis消息队列
*
* @author shikanatsu
*/
@Component
public class RedisMq {
/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link MSG_POOL}
* 的消息体body作为值存储
*/
public static final String MSG_POOL = "Message:Pool:";
/**
* zset队列 名称 queue
*/
public static final String QUEUE_NAME = "Message:Queue:";
// private static final int SEMIH = 30 * 60;
@Resource
private RedisService redisService;
/**
* 存入消息池
*
* @param message
* @return
*/
public boolean addMsgPool(RedisMessage message) {
if (null != message) {
redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
return true;
}
return false;
}
/**
* 从消息池中删除消息
*
* @param id
* @return
*/
public void deMsgPool(String group, String id) {
redisService.remove(MSG_POOL + group + id);
}
/**
* 向队列中添加消息
*
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/
public void enMessage(String key, long score, String val) {
redisService.zsset(key, val, score);
}
/**
* 从队列删除消息
*
* @param id
* @return
*/
public boolean deMessage(String key, String id) {
return redisService.zdel(key, id);
}
}
3.消息生产者
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 消息生产者
*
* @author shikanatsu
*/
@Component
public class MessageProvider {
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
@Resource
private RedisMq redisMq;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public boolean sendMessage(@Validated RedisMessage message) {
Assert.notNull(message);
//The priority is if there is no creation time
// message.setCreateTime(System.currentTimeMillis());
message.setId(IdUtil.fastUUID());
Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
try {
redisMq.addMsgPool(message);
redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}",message.toString(),new Date(),sdf.format(delayTime));
}catch (Exception e){
e.printStackTrace();
logger.error("RedisMq 消息发送失败,当前时间:{}",new Date());
return false;
}
return true;
}
}
4.消息消费者
/**
* Redis消息消费者
* @author shikanatsu
*/
@Component
public class RedisMqConsumer {
private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);
@Resource
private RedisMq redisMq;
@Resource
private RedisService redisService;
@Resource
private MessageProvider provider;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//@Scheduled(cron = "*/1 * * * * ? ")
/**
Instead of a thread loop, you can use Cron expressions to perform periodic tasks
*/
public void baseMonitor(RedisMqExecute mqExecute){
String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
//The query is currently expired
Set
if (null != set) {
long current = System.currentTimeMillis();
for (Object id : set) {
long score = redisService.getScore(queueName, id.toString()).longValue();
//Once again the guarantee has expired , And then perform the consumption
if (current >= score) {
String str = "";
RedisMessage message = null;
String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
try {
message = (RedisMessage)redisService.get(msgPool + id.toString());
log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
if(null==message){
return;
}
//Do something ; You can add a judgment here and if it fails you can add it to the queue again
mqExecute.execute(message);
} catch (Exception e) {
e.printStackTrace();
//If an exception occurs, it is put back into the queue
// todo: If repeated, this can lead to repeated cycles
log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
provider.sendMessage(message);
} finally {
redisMq.deMessage(queueName, id.toString());
redisMq.deMsgPool(message.getGroup(),id.toString());
}
}
}
}
}
}
5. 消息执接口
/**
* @author shikanatsu
*/
public interface RedisMqExecute {
/**
* 获取队列名称
* @return
*/
public String getQueueName();YqPDWtRr
/**
* 统一的通过执行期执行
* @param message
* @return
*/
public boolean execute(RedisMessage message);
/**
* Perform thread polling
*/
public void threadPolling();
}
6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求
/**
* 订单执行
*
* @author shikanatsu
*/
@Service
public class OrderMqExecuteImpl implements RedisMqExecute {
private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);
public final static String name = "orderPoll:";
@Resource
private RedisMqConsumer redisMqConsumer;
private RedisMqExecute mqExecute = this;
@Resource
private OrderService orderService;
@Override
public String getQueueName() {
return name;
}
@Override
/**
* For the time being, only all orders will be processed. You can change to make orders
*/
public boolean execute(RedisMessage message) {
logger.info("Do orderMqPoll ; Time:{}",new Date());
//Do
return true;
}
@Override
/** 通过线程去执行轮询的过程,时间上可以自由控制 **/
public void threadPolling() {
ThreadUtil.execute(() -> {
while (true) {
redisMqConsumer.baseMonitor(mqExecute);
ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
}
});
}
}
使用事例
1. 实现RedisMqExecute 接口 创建对应的轮询或者采取定时器的方式执行 和实现具体的任务。
2. 通过MessageProvider 实现相对应的消息服务和绑定队列组,通过队列组的方式执行。
3. 提示: 采取线程的方式需要在项目启动过程中执行,采取定时器或者调度的方式可以更加动态的调整。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~