Spring Boot集成Redisson实现延迟队列

项目场景:

   在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

   一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。


解决方案:

   一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。

1.集成redisson

maven依赖

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson-spring-boot-starter</artifactId>
	<version>3.21.1</version>
</dependency>

yml配置,单节点配置可以兼容redis的配置方式

# redis配置
spring:
  redis:
    database: 0
    host: 127.0.0.1
    password: redis@pass
    port: 6001

 更详细的配置参考:Spring Boot整合Redisson的两种方式-CSDN博客

2.配置多线程

因为延迟队列可能会多个任务同时执行,所以需要多线程处理。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ExecutorConfig {
    /**
     * 异步任务自定义线程池
     */
    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {
    	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(50);
        //配置最大线程数
        executor.setMaxPoolSize(500);
        //配置队列大小
        executor.setQueueCapacity(300);
        //允许线程空闲时间
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("taskExecutor-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //调用shutdown()方法时等待所有的任务完成后再关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //等待所有任务完成后的最大等待时间
		executor.setAwaitTerminationSeconds(60);
        return executor;
    }
}

3.具体业务

比如消息通知、关闭订单等 ,这里加上了@Async注解,可以异步执行

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class AsyncService {

	@Async
	public void executeQueue(Object value) {
		System.out.println();
		System.out.println("当前线程:"+Thread.currentThread().getName());
		System.out.println("执行任务:"+value);

		//打印时间方便查看
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("执行任务的时间:"+sdf.format(new Date()));
		//自己的业务逻辑,可以根据id发送通知消息等
		//......
	}
}

4.延迟队列(关键代码)

这里包括添加延迟队列,和消费延迟队列,@PostConstruct注解的意思是服务启动加载一次,参考Spring Boot中多个PostConstruct注解执行顺序控制_多个postconstruct执行顺序-CSDN博客

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@Service
public class TestService {

	@Resource
	private AsyncService asyncService;
	@Resource
	private ThreadPoolTaskExecutor executor;
	@Autowired
	private RedissonClient redissonClient;

	/**
	 * 添加延迟任务
	 */
	public void addQueue() {
		//获取延迟队列
		RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
		RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
		for (int i = 1; i <= 10; i++) {
			long delayTime = 5+i; //延迟时间(秒)
//			long delayTime = 5; //这里时间统一,可以测试并发执行
			delayedQueue.offer("延迟任务"+i, delayTime, TimeUnit.SECONDS);
		}
		//打印时间方便查看
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("添加任务的时间:"+sdf.format(new Date()));
	}

	/**
	 *	服务启动时加载,开始消费延迟队列
	 */
	@PostConstruct
	public void consumer() {
		System.out.println("服务启动时加载>>>>>>");
		//获取延迟队列
		RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");

		//启用一个线程来消费这个延迟队列
		executor.execute(() ->{
			while (true){
				try {
//					System.out.println("while中的线程:"+Thread.currentThread().getName());
					//获取延迟队列中的任务
					Object value = delayedQueue.poll();
					if(value == null){
						//如果没有任务就休眠1秒,休眠时间根据业务自己定义
						Thread.sleep(1000);	//这里休眠时间越短,误差就越小
						continue;
					}
					//异步处理延迟队列中的消息
					asyncService.executeQueue(value);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
	}
}

5.测试接口 

import com.test.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

	@Autowired
	private TestService testService;

	/*
	 * 添加延迟任务
	 */
	@GetMapping(value = "/addQueue")
	public String addQueue() {
		testService.addQueue();
		return "success";
	}

}

6.测试结果


 总结:

  1. Redisson的的RDelayedQueue是基于Redis实现的,而Redis本身并不保证数据的持久性。如果Redis服务器宕机,那么所有在RDelayedQueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将RDelayedQueue中的数据持久化到数据库。
  2. 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/574668.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OceanBase单机版安装体验

前情提要 上周OceanBase开发者大会过后&#xff0c;作为观察员也来体验一下OB的安装。业内有某个国产安装用了两周&#xff0c;这种其实有点劝退了。话说就是10年前&#xff0c;没搞过Oracle的人也不用两周安装一个数据库啊。今天看看OB的&#xff08;一体化&#xff09;安装。…

【震撼揭秘】Sentinel:一文读懂,那些让开发者“拍案叫绝”的核心特性!

关注微信公众号 “程序员小胖” 每日技术干货&#xff0c;第一时间送达&#xff01; 引言 在微服务架构中&#xff0c;流量治理是确保系统稳定性和高可用性的关键。Sentinel作为一项强大的流量控制组件&#xff0c;为我们提供了完善的解决方案。本文将带您走进Sentinel的世界…

第11章 Android特色开发——基于位置的服务

第11章 Android特色开发——基于位置的服务 本章中&#xff0c;将要学习一些全新的Android技术&#xff0c;这些技术有别于传统的PC或Web领域的应用技术&#xff0c;是只有在移动设备上才能实现的。 基于位置的服务&#xff08;Location Based Service&#xff09;。由于移动…

vue2实现字节流byte[]数组的图片预览

项目使用vantui框架&#xff0c;后端返回图片的字节流byte[]数组&#xff0c;在移动端实现预览&#xff0c;实现代码如下&#xff1a; <template><!-- 附件预览 --><div class"file-preview-wrap"><van-overlay :show"show"><…

IOS恢复

1、实验目的 通过本实验可以掌握&#xff1a; copy方式恢复IOS的步骤。TFTPDNLD方式恢复IOS的步骤。Xmodem方式恢复IOS的步骤。 2、实验拓扑 路由器IOS恢复的实验拓扑如下图所示。 3、实验步骤 如果工作中不慎误删除路由器IOS&#xff0c;或者升级了错误版本的IOS&#xff…

flutter笔记-webrtc使用1:依赖本地包socket.io-client

文章目录 1. 示例工程2. yaml 修改3. 使用4. socketio 关于自定义服务器自定义签名的问题封装成async和await方式 本文开始介绍webrtc的使用&#xff0c;阅读本文的前提是假设你已经使用过webrtc&#xff0c;了解webrtc的交互机制&#xff0c;不了解的可以看之前的文章&#xf…

【Java数据结构】初步认识ArrayList与顺序表

前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; hellohello~&#xff0c;大家好&#x1f495;&#x1f495;&#xff0c;这里是E绵绵呀✋✋ &#xff0c;如果觉得这篇文章还不错的话还请点赞❤️❤️收藏&#x1f49e; &#x1f49e; 关注&#x1f4a5;&#x…

Flutter开发好用插件url_launcher详解-启动 URL

文章目录 url_launcher介绍安装用法错误处理自定义行为其他功能 url_launcher介绍 url_launcher 是一个 Flutter 插件&#xff0c;用于启动 URL。它支持网络、电话、短信和电子邮件方案。您可以使用它从您的 Flutter 应用程序中打开网站、拨打号码、发送短信或撰写电子邮件。 …

jvm知识点总结(二)

Java8默认使用的垃圾收集器是什么? Java8版本的Hotspot JVM,默认情况下使用的是并行垃圾收集器&#xff08;Parallel GC&#xff09; 如果CPU使用率飙升&#xff0c;如何排查? 1.先通过top定位到消耗最高的进程id 2.执行top -h pid单独监控该进程 3.在2中输入H&#xff…

【线性代数 C++】求逆矩阵

对于 n n n阶矩阵 A A A&#xff0c;如果有 n n n阶矩阵 B B B&#xff0c;使 A B B A E ABBAE ABBAE&#xff0c;则说 A A A是可逆的&#xff0c;并把 B B B称为 A A A的逆矩阵. A A A的逆矩阵记作 A − 1 A^{-1} A−1&#xff0c;则 B A − 1 BA^{-1} BA−1.若 ∣ A ∣ ≠…

二、OSPF协议基础

基于SPF算法&#xff08;Dijkstra算法&#xff09;的链路状态路由协议OSPF&#xff08;Open Shortest Path First&#xff0c;开放式最短路径优先&#xff09; 目录 1.RIP在大型网络中部署所面临的问题 2.Router ID 3.OSPF的报文 4.OSPF邻居建立过程 5.OSPF报文的确认机制…

59、回溯-括号生成

思路&#xff1a; 括号是成对出现&#xff0c;首先左括号可以放n个&#xff0c;右括号也可以放n个。如果当前左括号放了3了&#xff0c;右括号放了4个&#xff0c;错了&#xff0c;如果左括号放了5个&#xff0c;右括号放了4个。可以&#xff0c;继续放右括号即可。所以可以设…

linux系统安全及应用【上】

目录 1.账号安全控制 1系统账号清理 2密码安全控制 1 对已经存在的用户账号进行控制 2 对新建的用户密码默认设置 3 历史命令和终端自动注销的安全管理 1 历史命令的限制 2. 用户切换管理 1 su命令的使用 2 ssh 3.授权用户管理 1 sudo命令 2 sudo用户别名 3 查看su…

Vuforia AR篇(三)— AR模型出场效果

目录 前言一、AR模型出场二、AR出场特效三、添加过渡效果四、效果 前言 例如&#xff1a;随着人工智能的不断发展&#xff0c;机器学习这门技术也越来越重要&#xff0c;很多人都开启了学习机器学习&#xff0c;本文就介绍了机器学习的基础内容。 一、AR模型出场 创建ARCamer…

【Go语言快速上手(四)】面向对象的三大特性引入

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:Go语言专栏⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多Go语言知识   &#x1f51d;&#x1f51d; GO快速上手 1. 前言2. 初识GO中的结构…

深度学习中的子空间、线性变换和矩阵概念应用

1.表示子空间 在深度学习中&#xff0c;“不同的表示子空间”通常是指模型通过不同的参数&#xff08;例如权重矩阵&#xff09;将输入数据映射到不同的高维空间&#xff0c;这些空间被称为表示子空间。每个子空间都能够捕获输入数据中不同的特征或模式。以下是一些详细解释&am…

软考-论文写作-论架构风格论文

题目 素材 框架 一、 摘要 2020年12月,我参加了某省政协委员履职系统的开发。该系统为政协机关人员线上开展各项工作以及委员完成各项履职提供了全方位的软件支撑。我在该项目重担任系统架构师一职,负责履职系统的架构设计。本文结合实践,以委员履职系统为例,主要讨论软件…

使用FunASR处理语音识别

FunASR是阿里的一个语音识别工具&#xff0c;比SpeechRecognition功能多安装也很简单&#xff1b; 官方介绍&#xff1a;FunASR是一个基础语音识别工具包&#xff0c;提供多种功能&#xff0c;包括语音识别&#xff08;ASR&#xff09;、语音端点检测&#xff08;VAD&#xff…

verilog中比较器的代码用法

在 verilog 中以大于“>”&#xff0c;等于””&#xff0c;小于”<”&#xff0c;大于等于”>”&#xff0c;小于等于”<”&#xff0c;不等于”!”表示&#xff0c;以大于举例&#xff0c;如 c a > b ;表示如果 a 大于 b&#xff0c;那么 c 的值就为 1&#x…

网盘——文件重命名

文件重命名具体步骤如下&#xff1a; 目录 1、具体步骤 2、代码实现 2.1、添加重命名文件的槽函数 2.2、关联重命名文件夹信号槽 2.3、添加重命名文件的协议 2.4、添加槽函数定义 2.5、服务器 2.6、添加重命名文件的case 2.7、客户端接收回复 3、测试 3.1、点击重命…