SpringBoot使用Redisson操作Redis及使用场景实战

avatar
作者
猴君
阅读量:0

前言

SpringBoot使用RedisTemplate、StringRedisTemplate操作Redis中,我们介绍了RedisTemplate以及如何SpringBoot如何通过RedisTemplate、StringRedisTemplate操作Redis。
RedisTemplate的好处就是基于SpringBoot自动装配的原理,使得整合redis时比较简单。

那既然SrpingBoot可以通过RedisTemplate操作Redis,为何又出现了Redisson呢?Rddisson 中文文档
Reddissin也是一个redis客户端,其在提供了redis基本操作的同时,还具备其他客户端一些不具备的高精功能,例如:分布式锁+看门狗、分布式限流、远程调用等等。Reddissin的缺点是api抽象,学习成本高。

一、概述

从 spring-boot 2.x 版本开始,spring-boot-data-redis 默认使用 Lettuce 客户端操作数据。

1.1 Lettuce

SpringBoot2之后,默认就采用了lettuce。
是高级Redis客户端,基于Netty框架的事件驱动的通信层,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。
Lettuce的API是线程安全的,可以操作单个Lettuce连接来完成各种操作,连接实例(StatefulRedisConnection)可在多个线程间并发访问。

1.2 Reddisson

基于Netty框架的事件驱动的通信层,方法是异步的,API线程安全,可操作单个Redisson连接来完成各种操作。
实现了分布式和可扩展的Java数据结构,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性。
提供很多分布式相关操作服务,如,分布式锁,分布式集合,可通过 Redis支持延迟队列。

总结:优先使用Lettuce,需要分布式锁,分布式集合等分布式的高级特性,添加Redisson结合使用。

二、Spring-Boot整合Redisson

2.1 引入依赖

<dependency>     <groupId>org.redisson</groupId>     <artifactId>redisson-spring-boot-starter</artifactId>     <version>3.13.6</version> </dependency> 

注意:引入此依赖后,无需再引入spring-boot-starter-data-redis,其redisson-spring-boot-starter内部已经进行了引入,且排除了 Redis 的 Luttuce 以及 Jedis 客户端。因此,在 application.yaml 中 Luttuce 和 Jedis 的配置是不会生效的。
在这里插入图片描述

在项目使用 Redisson 时,我们一般会使用 RedissonClient 进行数据操作,但有朋友或许觉得 RedissonClient 操作不方便,或者更喜欢使用 RedisTemplate 进行操作,其实这两者是可以共存的,我们只需要再定义RedisTemplate的配置类即可。参考SpringBoot使用RedisTemplate、StringRedisTemplate操作Redis

发现项目引入 Redisson 后,RedisTemplate底层所用的连接工厂也是 Redisson。
在这里插入图片描述

2.2 配置文件

在application.yaml中添加redis的配置信息。

spring:   data:     redis:       mode: master       # 地址       host: 30.46.34.190       # 端口,默认为6379       port: 6379       # 密码,没有不填       password: ''       # 几号库       database: 1       sentinel:         master: master         nodes: 30.46.34.190       cluster:         nodes: 30.46.34.190       lettuce:         pool:           # 连接池的最大数据库连接数           max-active: 200           # 连接池最大阻塞等待时间(使用负值表示没有限制)           max-wait: -1ms           # 连接池中的最大空闲连接           max-idle: 50           # 连接池中的最小空闲连接           min-idle: 8 

2.3 配置类

@Configuration @EnableConfigurationProperties({RedisProperties.class}) public class RedissonConfig {      private static final String REDIS_PROTOCOL_PREFIX = "redis://";      @Value("${spring.data.redis.mode}")     private String redisMode;      private final RedisProperties redisProperties;      public RedissonConfig(RedisProperties redisProperties) {         this.redisProperties = redisProperties;     }      /**      * 逻辑参考 RedissonAutoConfiguration#redisson()      */     @Bean(destroyMethod = "shutdown")     public RedissonClient redisson(@Autowired(required = false) List<RedissonAutoConfigurationCustomizer> redissonAutoConfigurationCustomizers) throws IOException {         Config config = new Config();         config.setCheckLockSyncedSlaves(false);          int max = redisProperties.getLettuce().getPool().getMaxActive();         int min = redisProperties.getLettuce().getPool().getMinIdle();          switch (redisMode) {             case "master": {                 SingleServerConfig singleConfig = config.useSingleServer()                         .setAddress(REDIS_PROTOCOL_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort())                         .setDatabase(redisProperties.getDatabase())                         .setPassword(redisProperties.getPassword());                  if (redisProperties.getConnectTimeout() != null) {                     singleConfig.setConnectTimeout((int) redisProperties.getConnectTimeout().toMillis());                 }                  singleConfig.setConnectionPoolSize(max);                 singleConfig.setConnectionMinimumIdleSize(min);             }             break;             case "sentinel": {                 String[] nodes = convert(redisProperties.getSentinel().getNodes());                  SentinelServersConfig sentinelConfig = config.useSentinelServers()                         .setMasterName(redisProperties.getSentinel().getMaster())                         .addSentinelAddress(nodes)                         .setDatabase(redisProperties.getDatabase())                         .setPassword(redisProperties.getPassword());                  if (redisProperties.getConnectTimeout() != null) {                     sentinelConfig.setConnectTimeout((int) redisProperties.getConnectTimeout().toMillis());                 }                  sentinelConfig.setMasterConnectionPoolSize(max);                 sentinelConfig.setMasterConnectionMinimumIdleSize(min);                 sentinelConfig.setSlaveConnectionPoolSize(max);                 sentinelConfig.setSlaveConnectionMinimumIdleSize(min);             }             break;             case "cluster": {                 String[] nodes = convert(redisProperties.getCluster().getNodes());                  ClusterServersConfig clusterConfig = config.useClusterServers()                         .addNodeAddress(nodes)                         .setPassword(redisProperties.getPassword());                  if (redisProperties.getConnectTimeout() != null) {                     clusterConfig.setConnectTimeout((int) redisProperties.getConnectTimeout().toMillis());                 }                  clusterConfig.setMasterConnectionMinimumIdleSize(min);                 clusterConfig.setMasterConnectionPoolSize(max);                 clusterConfig.setSlaveConnectionMinimumIdleSize(min);                 clusterConfig.setSlaveConnectionPoolSize(max);             }             break;             default:                 throw new IllegalArgumentException("无效的redis mode配置");         }          if (redissonAutoConfigurationCustomizers != null) {             for (RedissonAutoConfigurationCustomizer customizer : redissonAutoConfigurationCustomizers) {                 customizer.customize(config);             }         }          return Redisson.create(config);      }      private String[] convert(List<String> nodesObject) {         List<String> nodes = new ArrayList<String>(nodesObject.size());         for (String node : nodesObject) {             if (!node.startsWith(REDIS_PROTOCOL_PREFIX)) {                 nodes.add(REDIS_PROTOCOL_PREFIX + node);             } else {                 nodes.add(node);             }         }         return nodes.toArray(new String[0]);     } } 

2.4 使用方式

@Component public class RedissonService { 	@Resource     protected RedissonClient redissonClient;  	public void redissonExists(String key){ 		RBucket<String> rBucketValue = redissonClient.getBucket(key, StringCodec.INSTANCE); 		if (rBucketValue.isExists()){             String value = rBucketValue.get();             // doSomething         } else {            // doElseSomething         } 	}  } 

2.5 实用场景

2.5.1 分布式锁

有点经验的同学一提到使用分布式锁便联想到了redis,那redis如何实现分布式锁呢?

分布式锁本质上要实现的目标就是在Redis中占一个坑(简单的说,就是萝卜占坑的道理),当别的进程也要来占坑时,发现那个坑里已经有一个颗大萝卜时,就只好放弃或者稍后重试。

分布式锁常用手段

1.使用setNx命令
这个命令的详细描述是(set if not exists),如果指定key不存在则设置(成功占坑),在业务执行完成后,调用del命令删该key(释放坑)。比如:

# set 锁名 值 setnx distribution-lock  locked  // dosoming  del  distribution-lock 

但这个命令存在一个问题,如果执行逻辑中出现问题,可能导致del指令无法执行,那么该锁就会成为死锁了。
可能有小伙伴贴心的想到了,我们可以给这个key再设置一个过期时间呀。比如:

setnx distribution-lock  locked  expire distribution-lock  10  // dosoming  del  distribution-batch 

即使这样操作后,该逻辑仍有问题,由于 setnx 与 expire 是两条命令,如果在 setnx 与 expire 之间,redis 服务器挂了,就会导致 expire 不会执行,从而过期时间设置失败,该锁仍会成为死锁。

根源是 setnx 与 expire 两条命令并不是原子命令

且redis的事物也无法解决 setnx 与 expire 的问题,因为 expire 是依赖于 setnx 的执行结果的,如果 setnx 没有成功,expire则不应该执行。事物又无法进行if else判断,故 setnx+expire 方式实现分布式锁,并不是优解。

2.使用setNx Ex 命令
上方已经说了 setNx+expire 的问题,Redis官方为了解决这个问题,在2.8版本时引入了 set指令的扩展参数,使得 setnx 与 expire命令可以一起执行。比如:

# set 锁名 值 ex 过期时间(单位:秒) nx set distribution-lock locked ex 5 nx  // doSomthing  del distribution-lock 

从逻辑上来讲,setNx Ex 已是优解了,不会使该分布式锁成为死锁。

但在我们开发中,或许仍会出现问题,为什么呢?
由于我们一开始为此锁设置了一个过期时间,那假如我们的业务逻辑执行耗时超过了设置的过期时间呢?就会出现一个线程未执行完毕,第二个线程可能持有了这个分布式锁的情况。
所以呢,如果使用 setNx Ex 组合,必须要确保自己的锁的超时时间大于占锁后的业务执行时间

3.使用lua脚本+watch dog自动延期机制
这个方案在网上一找一大堆,在此就不做详细赘述。

Redisson实现分布式锁

上方介绍的 setNx 与 setNx Ex 命令,都是Redis 服务器为我们提供的原生命令,也或多或少的存在着一部分问题,为解决setNx Ex命令存在着业务逻辑大于锁超时时间的问题,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟(就是续期30s),也可以通过修改Config.lockWatchdogTimeout来另行指定,锁的初始过期时间默认也是30s。

tryLock(long waitTime, long leaseTime, TimeUnit unit)的三个参数

  • waitTime:等待时间,即在尝试获取锁时最多的等待时间。如果超过这个时间仍未获取到锁,则会放弃获取锁。
  • leaseTime:租约时间,即获取到锁后持有的时间。如果在这段时间内没有手动释放锁,则系统会自动释放锁。默认为-1,即如果不手动释放,则锁永久有效。
  • unit:时间单位,用于指定等待时间和租约时间的单位。
// 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS);  // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) {    try {      ...    } finally {        lock.unlock();    } } 
@Resource RedissonClient redissonClient;  @GetMapping("/testDistributionLock") public BaseResponse<String> testDistributionLock(){     RLock lock = redissonClient.getLock("redis:distributionLock");     boolean lockAcquired = false;     try {         boolean locked = lock.tryLock(10, 3, TimeUnit.SECONDS);         if(locked){             log.info("获取锁成功");             lockAcquired = true;             // 业务逻辑             return ResultUtils.success("ok" );         } else {             log.error("获取锁失败");             return ResultUtils.error(ErrorCode.SYSTEM_ERROR);         }     } catch (InterruptedException e) {             log.error("runWithLock error: {}, {}", e, key);             Thread.currentThread().interrupt();             throw e;     } catch (Exception e) {             log.error("runWithLock error: {}, {}", e, key);             throw e;     } finally {         if (lockAcquired) {             if (lock.isHeldByCurrentThread()) {                 try {                       lock.unlock();                     } catch (Exception e) {                         log.error("runWithLock unlock error: {}, {}", e, key);                     }             } else if (lock.isLocked()) {                  // 被其他线程锁定                  log.warn("runWithLock lock is locked by other thread: {}, {}", lock.getName(), key);             }        }    } } 

2.5.2 限流

我们是有面临高并发下需要对接口或者业务逻辑限流的问题,我们可以采用Guaua依赖下的RateLimiter 实现,实际上,Redisssion也有类似的限流功能。

在这里插入图片描述
RateLimiter 被称为令牌桶限流,此类限流是首先定义好一个令牌桶,指明在一定时间内生成多少个令牌,每次访问时从令牌桶获取指定数量令牌,如果获取成功,则设为有效访问。

Redisson实现限流器
// 限流器管理 @Component public class RateLimiterManager {      @Resource     private RedissonClient redissonClient;  	// 线程安全 ConcurrentHashMap     private final ConcurrentHashMap<String, RRateLimiter> rateLimiters = new ConcurrentHashMap<>();      public RRateLimiter getRateLimiter(String name, RateType rateType, long rate, Duration interval) {         return getRateLimiter(name, rateType, rate, interval, false);      }      public RRateLimiter getRateLimiter(String name, RateType rateType, long rate, Duration interval, boolean reset) {          return rateLimiters.computeIfAbsent(name, __ -> {         	// 声明一个限流器             final RRateLimiter rateLimiter = redissonClient.getRateLimiter(name);             final RateLimiterConfig config = rateLimiter.getConfig();             var finalReset = reset || !(config.getRate() == rate                     && config.getRateInterval() == interval.toMillis()                     && config.getRateType() == rateType);             if (finalReset) {             	// 设置桶的大小。setRate委托给了setRateAsync,这里使用hset来写入rate、interval、type三个值,如果存在则覆盖                 rateLimiter.setRate(rateType, rate, interval.toMillis(), RateIntervalUnit.MILLISECONDS);             } else {             	// 设置桶的大小。trySetRate委托给了trySetRateAsync,这里使用hsetnx来设置rate、interval、type三个值                 rateLimiter.trySetRate(rateType, rate, interval.toMillis(), RateIntervalUnit.MILLISECONDS);             }             return rateLimiter;         });     } } 
@Service public class BusinessLogic {  	@Resource     protected RateLimiterManager rateLimiter;      	public void doBusinessLogicWithLimiter(){ 		//每1分钟产生10个令牌 		var limiter = rateLimiter.getRateLimiter("limit:order:100001", RateType.OVERALL, 10, Duration.ofMinutes(1));         if (!limiter.tryAcquire()) {             // 被限流             return;         }         // 执行业务逻辑 	} } 

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!