尼恩说在前面
redis 分布式锁,如何实现高并发? redis 锁你用过吗?你遇到过哪些问题? redis 锁的5个大坑,如何规避? redis 分布式锁,如何解决锁失效?
最新《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF,请关注本公众号【技术自由圈】获取,回复:领电子书
本文目录
-先看本地锁:
-再看分布式锁
分布式锁 使用场景和主要类型
分布式锁 的使用场景
秒杀抢购或优惠券领取:在电商平台中,当进行秒杀或领取优惠券时,需要确保同一时间只有一个用户能够成功操作,以避免超卖或重复领取的问题。 订单处理:在分布式部署的电商系统中,用户下单前需要获取分布式锁,检查库存,确保库存足够后才允许下单,然后释放锁。 实时统计:在需要统计在线用户数、PV、UV等实时数据时,可以使用分布式锁来避免并发冲突,确保数据的一致性。 任务调度:在分布式系统中,如果需要执行任务调度,并且任务之间需要互斥执行,可以使用分布式锁来保证同一时间只有一个任务在运行。 分布式爬虫:对于需要对同一网站进行抓取的分布式爬虫系统,可以使用分布式锁来避免多个爬虫同时抓取同一资源,导致IP被封或资源过载。 消息队列幂等性:在使用 消息队列时,分布式锁可以用于确保消息不会被重复处理, 实现 幂等性。
先看本地锁:
再看分布式锁
什么是分布式锁?
当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。 用一个状态值表示锁,对锁的占用和释放通过状态值来标识。
互斥性。在任意时刻,只有一个客户端能持有锁。 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
数据库悲观锁、 数据库乐观锁; 基于Redis的分布式锁; 基于ZooKeeper的分布式锁。
常见分布式锁方案对比
分类 | 方案 | 实现原理 | 优点 | 缺点 |
redis 分布式锁 5大深坑
redis
分布式锁加以控制。之一: 原子性 之深坑
setNx
命令。SETNX
(SET if Not eXists)命令用于设置一个键值对,但只有在键不存在的情况下。SETNX
和Jedis实现一个非原子性Redis分布式锁的基本步骤:尝试获取锁: 使用 SETNX
命令尝试设置锁。如果返回值是1,表示获取锁成功;如果返回值是0,表示锁已经被其他进程或线程获取。设置过期时间: 为了确保锁最终会被释放,即使获取锁的进程或线程崩溃,也需要为锁设置一个过期时间。 执行业务逻辑: 在获取锁之后,执行需要同步的业务逻辑。 释放锁: 完成业务逻辑后,使用 DEL
命令删除锁。
setNx
分布式锁的示例代码:import redis.clients.jedis.Jedis;
public class RedisDistributedLock {
private static final String LOCK_SUCCESS = "OK";
private Jedis jedis = null;
private String lockKey;
private String lockValue = UUID.randomUUID().toString();
public RedisDistributedLock(Jedis jedis, String lockKey) {
this.jedis = jedis;
this.lockKey = lockKey;
}
public boolean tryLock(int expireTime) {
// 尝试获取锁
String result = jedis.setnx(lockKey, lockValue);
if (LOCK_SUCCESS.equals(result)) {
// 设置过期时间
jedis.expire(lockKey, expireTime);
return true;
}
return false;
}
/**
* 释放锁
*/
public void unlock() {
// 检查锁是否由当前进程或线程持有
if (lockValue.equals(jedis.get(lockKey))) {
// 删除锁
jedis.del(lockKey);
}
}
}
上述代码中的 lock
方法并不是原子性的,因为它涉及到两个步骤: 因为它涉及到SETNX
和EXPIRE
两个命令。上述代码中的 unlock
方法并不是原子性的,因为它涉及到两个步骤:获取锁的值和删除锁。
可以使用带 NX
和EX
选项版本的 SET`命令或者使用lua脚本。
使用lua脚本。
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock {
private RedisTemplate redisTemplate;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
public void unlock(Jedis jedis, String lockKey, String requestId) {
// 释放锁的逻辑需要确保原子性,这里只是一个简单示例
// 实际上,这里应该使用Lua脚本来确保删除操作的原子性
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
jedis.eval(script, 1, lockKey, requestId);
}
}
NX
和EX
选项版本的 SET 命令解决。Jedis.set
方法是 Jedis 客户端用来设置 Redis 中的键值对的方法,基本对应到 redis 的set命令, 版本有很多。Jedis.set
方法的一些常见使用方式:String set(String key, String value)
:将键值对设置到 Redis 中,如果键已经存在,则覆盖之前的值。返回 "OK" 表示操作成功。String set(String key, String value, String nxxx)
:这个版本允许 指定一个条件, nxxx
可以是NX
(Not Exist)或XX
(eXist),分别表示只有当键不存在或已经存在时才设置值。String set(String key, String value, String nxxx, String expx, int time)
:这个版本允许 设置键的过期时间, expx
可以是EX
(秒)或PX
(毫秒),time
是过期时间的值。
Jedis.set
方法的实现会将相应的命令发送到 Redis 服务器。例如,不带过期时间的 set
操作会发送 SET
命令,而带有过期时间的 set
操作会发送 SET
命令加上 EX
或 PX
选项。set
命令,是一个原子性的版本, 该命令可以指定多个参数。String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result))
{
return true;
}
return false;
lockKey
:锁的标识requestId
:请求idNX
:只在键不存在时,才对键进行设置操作。PX
:设置键的过期时间为 millisecond 毫秒。expireTime
:过期时间
set
命令是原子操作,加锁和设置超时时间,一个命令就能轻松搞定。Jedis.set
方法的不同参数,可以对应到 Redis 的SET命令的命令选项。SET
命令用于将给定的键(key)与值(value)关联。Redis 的SET
命令的基本格式和命令选项如下:SET key value
SET
命令还支持一些选项,以提供更多的控制功能,例如设置键的过期时间、仅在键不存在时设置值等。以下是 SET
命令的一些常用选项:NX:仅当键不存在时,才对键进行设置操作。相当于 "SET if Not eXists"。 SET key value NX XX:仅当键已经存在时,才对键进行设置操作。相当于 "SET if eXists"。 SET key value XX EX:设置键的过期时间,单位为秒。 SET key value EX seconds PX:设置键的过期时间,单位为毫秒。 SET key value PX milliseconds EXAT:设置键的过期时间,精确到某个时间点(Unix时间戳)。 SET key value EXAT timestamp PXAT:设置键的过期时间,精确到某个时间点(Unix时间戳),单位为毫秒。 SET key value PXAT milliseconds-timestamp KEEPTTL:设置键的值,但保持键的原始过期时间不变。 SET key value KEEPTTL GET:设置键的值,并返回旧的值。 SET key value GET
SET key value NX EX 10
SET
命令返回的结果通常是 "OK",表示操作成功。如果使用了 GET
选项,SET
命令将返回旧的值。SET
命令非常灵活,可以用于实现各种复杂的逻辑,如分布式锁、缓存控制等场景。之二:连接耗尽 之深坑
public class LockDemo {
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("localhost", 6379);
JedisLock lock = new JedisLock(jedisPool, "myLock");
if (lock.tryLock(10)) { // 尝试获取锁,锁的过期时间为10秒
// 执行业务逻辑
System.out.println("Lock acquired, executing business logic");
lock.unlock(); // 执行完业务,释放锁
System.out.println("Lock released");
} else {
System.out.println("Failed to acquire lock");
}
}
}
Jedis
客户端会报如下的错误信息redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
public class LockDemo {
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("localhost", 6379);
JedisLock lock = new JedisLock(jedisPool, "myLock");
try {
if (lock.tryLock(10)) { // 尝试获取锁,锁的过期时间为10秒
try {
// 执行业务逻辑
System.out.println("Lock acquired, executing business logic");
Thread.sleep(5000); // 模拟业务逻辑执行时间
} finally {
lock.unlock(); // 释放锁
System.out.println("Lock released");
}
} else {
System.out.println("Failed to acquire lock");
}
} finally {
jedisPool.close();
}
}
}
注意事项
锁的过期时间:在 tryLock
方法中设置了锁的过期时间,这是为了防止持有锁的进程崩溃,从而导致锁永远无法释放。但是也不能太长,设置在 30s内为适宜。锁的安全性:在 unlock
方法中,我们使用 Lua 脚本来确保检查和删除操作的原子性,这是为了防止错误地释放其他进程持有的锁。连接池资源管理:在示例中,关闭了连接,确保 Jedis 连接在使用后被正确关闭,避免资源泄露。
手动加锁 业务操作 手动释放锁 如果手动释放锁失败了,则达到超时时间,redis会自动释放锁。
之三:锁过期 的深坑
//写数据到文件
function writeData(filename, data) {
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (!locked) {
throw 'Failed to acquire lock';
}
try {
//将数据写到文件
var file = storage.readFile(filename);
var updated = updateContents(file, data);
storage.writeFile(filename, updated);
} finally {
lock.unlock();
}
}
如果在写文件过程中,发生了 fullGC,并且其时间跨度较长, 超过了10秒, 那么,由于锁的有效期就是 10s,这时候任务没有执行完成,分布式锁就自动过期了。
锁过期问题 的解决方案
方式一:模拟CAS乐观锁的方式,增加版本号
client1 的token 是33 client2 的token 是34
方式二:watch dog自动延期机制
redission,采用的就是这种方案, 此方案不会入侵业务代码。
redisson
的解决锁过期的源码,大致如下。@Slf4j
@Service
public class RedisDistributionLockPlus {
/**
* 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
*/
private static final long DEFAULT_LOCK_TIMEOUT = 30;
private static final long TIME_SECONDS_FIVE = 5 ;
/**
* 每个key的过期时间 {@link LockContent}
*/
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/**
* redis执行成功的返回
*/
private static final Long EXEC_SUCCESS = 1L;
/**
* 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
*/
private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
"if redis.call('exists', KEYS[1]) == 0 then " +
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
"for k, v in pairs(t) do " +
"if v == 'OK' then return tonumber(ARGV[2]) end " +
"end " +
"return 0 end";
/**
* 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
*/
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"local ctime = tonumber(ARGV[2]) " +
"local biz_timeout = tonumber(ARGV[3]) " +
"if ctime > 0 then " +
"if redis.call('exists', KEYS[2]) == 1 then " +
"local avg_time = redis.call('get', KEYS[2]) " +
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
"else redis.call('del', KEYS[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
"end " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
/**
* 续约lua脚本
*/
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplate redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
ScheduleTask task = new ScheduleTask(this, lockContentMap);
// 启动定时任务
ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}
/**
* 加锁
* 取到锁加锁,取不到锁一直等待知道获得锁
*
* @param lockKey
* @param requestId 全局唯一
* @param expire 锁过期时间, 单位秒
* @return
*/
public boolean lock(String lockKey, String requestId, long expire) {
log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
for (; ; ) {
// 判断是否已经有线程持有锁,减少redis的压力
LockContent lockContentOld = lockContentMap.get(lockKey);
boolean unLocked = null == lockContentOld;
// 如果没有被锁,就获取锁
if (unLocked) {
long startTime = System.currentTimeMillis();
// 计算超时时间
long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
String lockKeyRenew = lockKey + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
if (null != lockExpire && lockExpire > 0) {
// 将锁放入map
LockContent lockContent = new LockContent();
lockContent.setStartTime(startTime);
lockContent.setLockExpire(lockExpire);
lockContent.setExpireTime(startTime + lockExpire * 1000);
lockContent.setRequestId(requestId);
lockContent.setThread(Thread.currentThread());
lockContent.setBizExpire(bizExpire);
lockContent.setLockCount(1);
lockContentMap.put(lockKey, lockContent);
log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
return true;
}
}
// 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
if (Thread.currentThread() == lockContentOld.getThread()
&& requestId.equals(lockContentOld.getRequestId())){
// 计数 +1
lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
return true;
}
// 如果被锁或获取锁失败,则等待100毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// 这里用lombok 有问题
log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
return false;
}
}
}
/**
* 解锁
*
* @param lockKey
* @param lockValue
*/
public boolean unlock(String lockKey, String lockValue) {
String lockKeyRenew = lockKey + "_renew";
LockContent lockContent = lockContentMap.get(lockKey);
long consumeTime;
if (null == lockContent) {
consumeTime = 0L;
} else if (lockValue.equals(lockContent.getRequestId())) {
int lockCount = lockContent.getLockCount();
// 每次释放锁, 计数 -1,减到0时删除redis上的key
if (--lockCount > 0) {
lockContent.setLockCount(lockCount);
return false;
}
consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
} else {
log.info("释放锁失败,不是自己的锁。");
return false;
}
// 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
lockContentMap.remove(lockKey);
RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
Long.toString(lockContent.getBizExpire()));
return EXEC_SUCCESS.equals(result);
}
/**
* 续约
*
* @param lockKey
* @param lockContent
* @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
*/
public boolean renew(String lockKey, LockContent lockContent) {
// 检测执行业务线程的状态
Thread.State state = lockContent.getThread().getState();
if (Thread.State.TERMINATED == state) {
log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
return false;
}
String requestId = lockContent.getRequestId();
long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
return EXEC_SUCCESS.equals(result);
}
static class ScheduleExecutor {
public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
long delay = unit.toMillis(initialDelay);
long period_ = unit.toMillis(period);
// 定时执行
new Timer("Lock-Renew-Task").schedule(task, delay, period_);
}
}
static class ScheduleTask extends TimerTask {
private final RedisDistributionLockPlus redisDistributionLock;
private final Map<String, LockContent> lockContentMap;
public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
this.redisDistributionLock = redisDistributionLock;
this.lockContentMap = lockContentMap;
}
@Override
public void run() {
if (lockContentMap.isEmpty()) {
return;
}
Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
for (Map.Entry<String, LockContent> entry : entries) {
String lockKey = entry.getKey();
LockContent lockContent = entry.getValue();
long expireTime = lockContent.getExpireTime();
// 减少线程池中任务数量
if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
//线程池异步续约
ThreadPool.submit(() -> {
boolean renew = redisDistributionLock.renew(lockKey, lockContent);
if (renew) {
long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
lockContent.setExpireTime(expireTimeNew);
} else {
// 续约失败,说明已经执行完 OR redis 出现问题
lockContentMap.remove(lockKey);
}
});
}
}
}
}
}
加锁
、解锁
、续约
都是客户端把一些复杂的业务逻辑,通过封装在Lua
脚本中发送给redis
,保证这段复杂业务逻辑执行的原子性
之四:锁失效 的深坑
min-slaves-to-write 1
min-slaves-max-lag 10
RedissonClient redisson = // 初始化 Redisson 客户端
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
try {
if (multiLock.tryLock()) {
// 成功获取锁,执行业务逻辑
} else {
// 获取锁失败
}
} finally {
multiLock.unlock(); // 释放锁
}
互斥性:在任何时间,只有一个客户端可以获得锁,确保了资源的互斥访问。 避免死锁:通过为锁设置一个较短的过期时间,即使客户端在获得锁后由于网络故障等原因未能按时释放锁,锁也会因为过期而自动释放,避免了死锁的发生。 容错性:即使一部分 Redis 节点宕机,只要大多数节点(即过半数以上的节点)仍在线,RedLock 算法就能继续提供服务,并确保锁的正确性。
之五:锁分段 的深坑
可能 其他slot槽位 还有 库存, 但是 用户请求 路由到 对应的分片槽位 没有库存, 导致 扣减库存失败
解决办法是:在使用hash取模基础上,可以使用动态库存迁移,减少库存消耗不均和无效重试
库存动态迁移
架构总结
说在最后:有问题找老架构取经
被裁之后, 空窗1年/空窗2年, 如何 起死回生 ?
案例1:42岁被裁2年,天快塌了,急救1个月,拿到开发经理offer,起死回生
案例2:35岁被裁6个月, 职业绝望,转架构急救上岸,DDD和3高项目太重要了
案例3:失业15个月,学习40天拿offer, 绝境翻盘,如何实现?
被裁之后,100W 年薪 到手, 如何 人生逆袭?
100W案例,100W年薪的底层逻辑是什么? 如何实现年薪百万? 如何远离 中年危机?
如何 逆天改命,包含AI、大数据、golang、Java 等
实现职业转型,极速上岸
关注职业救助站公众号,获取每天职业干货
助您实现职业转型、职业升级、极速上岸
---------------------------------
实现架构转型,再无中年危机
关注技术自由圈公众号,获取每天技术千货
一起成为牛逼的未来超级架构师
几十篇架构笔记、5000页面试宝典、20个技术圣经
请加尼恩个人微信 免费拿走
暗号,请在 公众号后台 发送消息:领电子书
如有收获,请点击底部的"在看"和"赞",谢谢