【Redis】Redisson

image-20210913131720145

Redisson 简介

官网文档上详细说明了不推荐使用 setnx 来实现分布式锁,应该参考 the Redlock algorithm 的实现

image-20201101050725534

the Redlock algorithm:https://redis.io/topics/distlock

在 Java 语言环境下使用 Redisson,即 Redisson 是 Redlock 在 Java 中的实现

image-20201101050924914

官方文档:https://github.com/redisson/redisson/wiki/目录

Redisson是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid),提供了分布式和可扩展的 Java 数据结构。其特点:

  • 基于 Netty 实现,采用非阻塞 IO,性能高
  • 支持异步请求
  • 支持连接池、pipeline、LUA Scripting、Redis Sentinel、Redis Cluster 不支持事务,官方建议以 LUA Scripting 代替事务
  • 主从、哨兵、集群都支持。Spring 也可以配置和注入 RedissonClient。
  • 实现分布式锁:在 Redisson 里面提供了更加简单的分布式锁的实现。

Redisson 配置与使用

导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- 原生 redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>

<!-- 与 spring boot 整合 -->
<!-- redisson-spring-boot-starter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.0</version>
</dependency>

程序化配置

Redisson程序化的配置方法是通过构建Config对象实例来实现的,使用Config对象创建出RedissonClient对象,后续所有对Redisson的使用都借助于RedissonClient对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public static class MyRedissonConfig {
// 集群模式
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useClusterServers()
.setPassword("zhaoyuyun")
.addNodeAddress("redis://127.0.0.1:7004", "redis://127.0.0.1:7001"); // 可以用"rediss://"来启用SSL连接
return Redisson.create(config);
}

// 单节点模式
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer()
.setPassword("zhaoyuyun")
.setAddress("redis://myredisserver:6379");
return Redisson.create(config);
}
}

使用案例

该案例出自云商城项目的 Redis 分布式锁 部分。思路:

  • 先查看缓存中是否有该数据,如果有就返回
  • 如果没有,先加分布式锁,然后再去数据库里查数据。而不应该先加锁再查缓存

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 先查询缓存是否存在,如果存在就直接返回,否则先加上分布式锁,然后再去数据库里查
*/
@Override
public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedis() {
/**
* 1. 空结果缓存(或布隆过滤器):解决缓存穿透
* 2. 设置过期时间(加随机值):解决缓存雪崩
* 3. 加锁:解决缓存击穿
*/
// 序列化:先将Java对象转成JSON字符串,然后向缓存中存储JSON字符串,
// 反序列化:读取时也是读取出JSON字符串,再转成Java对象使用

// 1. 先查询是否有缓存
String cache = redisTemplate.opsForValue().get("catalogJSON");
if (StringUtils.isEmpty(cache)) {
// 2. 缓存如果没命中,先加上分布式锁,然后再去数据库里查数据
Map<String, List<Catalog2Vo>> catalogJson = getCatalogJsonFromDBWithRedissonLock();
// 将Java对象转换成JSON字符串
String s = JSON.toJSONString(catalogJson);
// 3. 将查询到的数据放入缓存
redisTemplate.opsForValue().set("catalogJSON", s);
return catalogJson;
}

// 将缓存中的JSON字符串转换成实际对象。其中,TypeReference 以匿名内部类的形式创建
Map<String, List<Catalog2Vo>> result = JSON.parseObject(cache, new TypeReference<Map<String, List<Catalog2Vo>>>() {
});
return result;
}

/**
* 如果Redis中缓存不存在,先加上分布式锁,然后再查数据库
* 使用Redisson操作分布式锁
* @return
*/
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedissonLock() {
// 1. 原子性加锁,其内会自动设置过期时间(看门狗+自动续期机制)。也可以手动指定过期时间
RLock lock = redissonClient.getLock("catalogJson-lock");
lock.lock();

Map<String, List<Catalog2Vo>> catalogJsonFromDB;
try {
// 2. 加锁后,去数据库里查数据
catalogJsonFromDB = getCatalogJsonFromDB();
} finally {
// 3. 最后原子性解锁
lock.unlock();
}
return catalogJsonFromDB;
}

可重入锁 Reentrant Lock

基于Redis的Redisson分布式可重入锁RLock 。其实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口。

该类可实现分布式锁的效果,原理为:在Redis缓存创建了一把锁,其他线程再想获取锁时就得阻塞等待该锁从Redis缓存中删除(底层有一个while(true)循环不断尝试获取锁)。

方式一:手动设置过期时间

方式一:手动设置过期时间:

1
lock.lock(10, TimeUnit.SECONDS);

上述代码设置10s后自动解锁(不会自动续期),这个时间一定要大于业务的执行时间,否则锁过期后再unlock(),解锁的就是别的线程加的锁,此时就会报错uuid不匹配。

方式二:看门狗 + 自动续期

方式二:不手动指定过期时间,使用看门狗默认的过期时间 + 自动续期策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// redisson 是上文中注入到容器中的 RedissonClient 对象
RLock lock = redisson.getLock("myLock"); // 设置锁名

// 加锁,若其他线程发现该锁已经被锁上,则阻塞式等待其他线程解锁后才继续运行
lock.lock();

try {
// 业务...
} finally {
// 解锁
lock.unlock();

// 严谨的写法:
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}

流程:

  • 当前线程在执行 lock() 后,将在Redis缓存中创建一个锁(Hash结构),其key值为"myLock",其内存储了key = uuid:线程号, value = 1,代表当前线程对象拥有了该锁;
  • 此时其他线程再调用lock() 时将发现Redis缓存中已经存在了"myLock"锁,因此会阻塞等待while(true)循环判断锁是否还在);
  • 等待当前线程调用 unlock() 后,该"myLock"锁将从Redis缓存中移除,此时其他线程才可以结束阻塞,创建另一把锁,即再在Redis缓存中创建一个"myLock"

上述方式的细节:

  • 锁的自动续期:如果业务超长,运行期间自动会给锁续期到30s,不用担心业务时间长,锁自动过期被删掉。
  • 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁也默认也会在30s后自动删除。

自动续期原理

  • 如果我们给锁指定了超时时间,Redisson就会给Redis发送LUA脚本进行占锁且设置指定的超时时间(保证占锁和设置超时时间的原子性)
  • 如果我们未指定锁的超时时间,Redisson就会设置超时时间为LockWatchdogTimeout(看门狗)的默认时间30s。如果占锁成功,就会返回一个 RFuture<Long> 对象,异步监听一个定时任务(用于给锁重新设置超时时间,新的超时时间就是看门狗的默认时间30s),之后每隔10s(三分之一的看门狗默认时间)就会将超时时间续期到30s

最佳实战:使用 lock.lock(30, TimeUnit.SECONDS) 方式并手动解锁。将超时时间设置的大一些,手动解锁。这样的好处是省掉了频繁续期的操作。


方式三:尝试加锁

方式三:尝试加锁,等待一定时间还未拿到锁就放弃加锁。

lock() 方法会阻塞等待直到获取到锁。若不想阻塞等待,可以使用tryLock() 方法,其会阻塞等待一定时间后停止等待,即放弃尝试加锁:

1
2
3
4
5
6
7
8
9
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}

读写锁 RReadWriteLock

读写锁 RReadWriteLock 用于保证一定能读取到最新数据,用法是:在改数据时加写锁,在读数据时加读锁。修改数据期间:

  • 写锁是一个排他锁(互斥锁/独占锁)
  • 读锁是一个共享锁

写锁如果没有释放,读锁就得一直阻塞等待。

1
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

改数据时加写锁:

1
2
3
4
5
6
RLock wlock = lock.writeLock();
wLock.lock();

// ...

wlock.unlock();

读数据时加读锁:

1
2
3
4
5
6
RLock rlock = lock.readLock();
rLock.lock();

// ...

rlock.unlock();

某个线程开启写锁时,其他线程的读锁就会阻塞等待,写锁当然也会阻塞等待。

四种不同情况下的锁:

  • 读 + 读:相当于无锁,并发读。只会分别在Redis中记录当前的读锁,会同时添加锁成功,不会阻塞
  • 写 + 读:读锁阻塞等待写锁释放才能读
  • 写 + 写:后来的写锁阻塞等待前一个写锁释放才能写
  • 读 + 写:后来的写锁阻塞等待其他读锁释放才能写(防止前一个线程还没读到数据就被后来的线程修改了)

总结:只要有写锁的存在,无论是先写还是后写,都必须阻塞等待。

源码

在添加读写锁时,Redisson 使用 LUA 脚本发送指令,保证指令的原子性。并且在指令内设置了 mode,其指定了该锁是读还是写。从而决定当前请求是阻塞还是放行。

image-20220307160232215

信号量 RSemaphore

信号量机制可以用来做分布式限流

情景:先在Redis中创建一个key: "park", value=3,代表当前共有三个车位(三个信号)。之后客户端多次获取信号,直到Redis中"park"的信号量为0,代表没有多余的信号给客户端了。此后再想来获取信号的客户端就会阻塞等待其他客户端释放信号,才能继续获取到信号。

获取一个信号:获取一个值,占一个车位

1
2
3
4
5
6
7
8
9
10
RSemaphore park = redisson.getSemaphore("park");
park.acquire();

// 不阻塞等待,没有信号就放弃获取信号
boolean b = park.tryAcquire();
if (b) {
// 执行业务
} else {
return "error";
}

释放一个信号:释放一个车位

1
2
RSemaphore park = redisson.getSemaphore("park");
park.release();

闭锁 RCountDownLatch

基于Redisson的分布式闭锁(CountDownLatch)。Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

1
2
3
4
5
6
7
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(5);
latch.await(); // 阻塞等待5个数都减掉

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown(); // 计数减一