收藏文章 楼主
基于Springboot redis实现延时队列
版块:IT/互联网   类型:普通   作者:小羊羔links   查看:219   回复:0   获赞:0   时间:2022-01-24 22:46:36

什么是延迟队列?

首先,队列这种数据结构相信大家都不陌生,它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列中的元素会被优先取出进行消费;

延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。


◆ 应用场景

  • 下单成功后,X分钟内没有支付,自动取消订单

  • 外卖场景,快要超时时给外卖小哥发送提醒通知

  • 预定的会议开始前X分钟提醒

  • 等等


◆ 方案

  • JDK中DelayQueue相关API

  • Quartz

  • Redis Zset(本文

  • MQ

  • 等等


◆ 实现

本文使用Redis Zset来实现延迟队列。

zset 是 Redis 提供的最具特色的数据类型之一,首先它是一个 set,这保证了内部 value 值的唯一性,其次它给每个 value 添加了一个 score(分值 属性,通过对分值的排序实现了有序化。比如用 zset 结构来存储学生的成绩,value 值代表学生的 ID,score 则是的考试成绩。我们可以对成绩按分数进行排序从而得到学生的的名次。


1、pom.xml

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yml

spring:
redis:
host: localhost
port: 6379
password: xxx
database: 1


2、延迟任务对象定义

public interface RedisDelayTask {
/**
* 任务ID
* @return
*/

String getId();

/**
* 队列中的值
* @return
*/

String getValue();

/**
* 延迟时间(单位 s
* @return
*/

long getDelayTime();

/**
* 任务执行
*/

void execute();
}
/**
* 抽象任务
*/

public abstract class AbstractRedisDelayTask implements RedisDelayTask {

protected String id;
protected String value;
private long delayTime;

public AbstractRedisDelayTask(String id, String value, long delayTime) {
this.id = id;
this.value = value;
this.delayTime = delayTime;
}

@Override
public String getId()
{
return id;
}

public void setId(String id) {
this.id = id;
}

@Override
public String getValue()
{
return value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public long getDelayTime()
{
return delayTime;
}

public void setDelayTime(long delayTime) {
this.delayTime = delayTime;
}

@Override
public String toString()
{
return "RedisDelayTask{" +
"id='" + id + '\'' +
", value='" + value + '\'' +
", delayTime=" + delayTime +
'}';
}
}

3、通知类任务定义

public class NoticeTask extends AbstractRedisDelayTask {

private final static Logger LOGGER = LoggerFactory.getLogger(NoticeTask.class);

public NoticeTask(String id, String value, long delayTime) {
super(id, value, delayTime);
}

@Override
public void execute() {
LOGGER.info("task execute, {}", this);
}
}

4、任务管理

@Component
public class RedisDelayQueueManager implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;
/**
* 任务列表
*/

private Map<String, RedisDelayTask> tasks = new ConcurrentHashMap<>();


/**
* 添加延迟任务到队列
* @param task
*/

public void addTask(RedisDelayTask task) {
long delayedTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(task.getDelayTime(), TimeUnit.SECONDS);
boolean r = redisTemplate.opsForZSet().add(task.getId(), task.getValue(), delayedTime);
if (r) {
tasks.put(task.getId(), task);
}
}

/**
* 检查并执行任务
*/

private void checkAndExecuteTask() {
while (true) {
Set<String> taskIds = tasks.keySet();
for (String taskId : taskIds) {
// score就是任务要执行的时间点,如果<=当前时间,说明任务该执行了
Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores(taskId, 0, System.currentTimeMillis());
if (!CollectionUtils.isEmpty(tuples)) {
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
// 移除并执行任务
RedisDelayTask task = tasks.remove(taskId);
if (task != null) {
task.execute();
// 从队列中删除
redisTemplate.opsForZSet().remove(taskId, tuple.getValue());
}
}
}
}
}
}

@Override
public void afterPropertiesSet() throws Exception {
// 新起一个线程执行任务
new Thread(() -> {
checkAndExecuteTask();
}, "redis-delay-task").start();
}
}

5、测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes
= RedisApplication.class)
public class RedisDelayTaskTest
{
@Autowired
private RedisDelayQueueManager redisDelayQueueManager;

@Test
public void addTask() throws IOException {
NoticeTask task = new NoticeTask("notice-task", "notice-task-value", 5);
redisDelayQueueManager.addTask(task);
NoticeTask task2 = new NoticeTask("notice-task2", "notice-task-value2", 10);
redisDelayQueueManager.addTask(task2);
System.in.read();
}
}

执行结果如下

2022-01-22 17:27:53.428 INFO 86506 --- [ main] io.lettuce.core.KqueueProvider : Starting without optional kqueue library

2022-01-22 17:27:58.140 INFO 86506 --- [edis-delay-task] c.springboot.demo.redis.task.NoticeTask : task execute, RedisDelayTask{id='notice-task', value='notice-task-value', delayTime=5}

2022-01-22 17:28:03.925 INFO 86506 --- [edis-delay-task] c.springboot.demo.redis.task.NoticeTask : task execute, RedisDelayTask{id='notice-task2', value='notice-task-value2', delayTime=10}



来源

https://www.toutiao.com/a7055604224931152423/?log_from=ab592af8b7514_1642901596778


欢迎广大技术人员 稿, aliang@itdks.com





来都来了,走啥走,留个言呗



   |  关于版权 

由“(ID ”原创的文章,转载时请注明作者、出处及 。 稿、约稿、转载 ITDKS10( 稿 ,茉莉小姐姐会及时与您联系!

感谢您对的热心支持!


  • 相关推荐


  • 推荐文章

  • Spring Boot 实现通用 Auth 认证的 4 种方式

  • 2021年终奖调查报告出炉,你的年终奖怎么样?

  • 日拱一卒 如何在Springboot中使用拦截器?

  • ClickHouse数据库和表引擎简介

  • 小型项目必备 SpringBoot Actuator—埋点和监控

  • JVM调优工具锦囊 JDK自带工具与Arthas线上分析工具对比

  • 2022年软件开发趋势的22个预测

  • 20个提高生产力的 Linux 命令与技巧,用完带你起飞

  • 程序员,你也该懂系统集成之服务集成交互技术——网络协议了吧?

  • Nginx之进程间的通信机制(共享内存、原子操作


小羊羔锚文本外链网站长https://seo-links.cn 
回复列表
默认   热门   正序   倒序

回复:基于Springboot redis实现延时队列

Powered by 免费发外链软文 7.12.4

©2015 - 2022 小羊羔外链网

免费发软文外链 鄂ICP备16014738号-6

您的IP:3.234.210.25,2022-10-04 12:03:37,Processed in 0.01642 second(s).

支持原创软件,抵制盗版,共创美好明天!
头像

用户名:

粉丝数:

签名:

资料 关注 好友 消息