分布式锁
约 3993 字大约 13 分钟
分布式锁
2025-02-27
简介
什么是分布式锁
在分布式系统中,为了保证数据一致性,需要对共享资源进行访问时,需要进行加锁操作。分布式锁就是用来控制分布式系统中多个节点对共享资源的访问,以保证共享资源在同一时刻只被一个节点访问。
为什么需要分布式锁
在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、ReentrantLock等;
但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保证同一个JVM进程中保证有效,所以这时就需要使用分布式锁了
分布式锁特点
- 互斥性:在任意时刻,只有一个节点可以持有分布式锁;
- 不会死锁:即使有一个节点在持有锁的期间崩溃,其他节点也能在有限的时间内自动获取锁;
- 容错性:只要大多数节点正常运行,分布式锁就能正常工作;
- 高性能:相比于基于Zookeeper的协调服务,基于分布式锁的实现更加简单、高效。
JVM锁
在JVM中,synchronized和ReentrantLock都是基于monitor对象实现的,monitor对象是JVM实现同步的一种方式,每个对象都有一把锁,当一个线程试图获取对象的锁时,如果锁已经被其他线程持有,则该线程进入阻塞状态,直到锁被释放。
案例:商品超卖问题
背景:在电商项目中,用户购买商品和数量后后,系统会对商品的库存进行相应数量的扣减,但是由于并发访问导致库存超卖的问题。
- 准备数据表
create table mall_stock (
id int primary key auto_increment COMMENT '库存ID',
product_id varchar(20) not null COMMENT '商品编号',
sock_id int not null default 1 COMMENT '仓库ID',
count int not null default 0 COMMENT '数量'
)
- 接着,我们创建一个SpringBoot的项目,在接口中实现简单的扣减库存的逻辑,示例如下: 控制器:
@RestController
@RequestMapping("/stock")
@Tag(name = "库存管理", description = "库存管理相关接口")
@RequiredArgsConstructor
public class StockController {
private final StockService stockService;
@GetMapping("/reduce")
@Operation(summary = "减少库存")
public Result reduceStock(
@RequestParam("productId") String productId,
@RequestParam("count") Integer count){
boolean falg = stockService.reduceStock(productId, count);
return Result.judge(falg);
}
}
业务接口实现:
@Override
public boolean reduceStock(String productId, Integer count) {
//1,根据productId查询库存
LambdaQueryWrapper<Stock> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(productId!= null&&productId.isEmpty(),Stock::getProductId,productId);
Stock stock = stockMapper.selectOne(queryWrapper);
//2,判断库存是否充足
int i = 0;
if(stock!= null && stock.getCount() >= count){
//3,减少库存
stock.setCount(stock.getCount() - count); //减少库存
i = stockMapper.updateById(stock);
}
//4,返回结果
return i>0;
}
- 商品超卖测试 为了模拟商品超卖问题,我们可以使用JMeter对接口进行并发访问,并设置线程数为1000,模拟1000个用户同时访问接口。
再添加Http取样器
运行测试,查看数据库发现库存量还剩余900多个,说明库存并未被正确扣减。出现了商品超卖现象。 带换到线上场景,这个时候后续还有用户继续请求购买,最终实际卖出的肯定会远远超过库存,这就是经典的超卖问题。
问题原因:
- 并发访问导致库存超卖
解决:添加JVM锁
- 方案一:使用synchronized关键字
@GetMapping("/reduceStock")
public synchronized String reduceStock(
@RequestParam("pid") Integer pid,
@RequestParam("num") Integer num) {
// TODO: 2025/2/26 减少库存
boolean result = stockService.reduceStock(pid, num);
if(result){
System.out.println("减少库存成功!"+(++count));
return "购买成功!";
}
// System.out.println("减少库存失败!");
return "购买失败!";
}
- 方案二:使用ReentrantLock对象
ReentrantLock lock = new ReentrantLock();
@GetMapping("/reduceStock2")
public String reduceStock2(
@RequestParam("pid") Integer pid,
@RequestParam("num") Integer num) {
lock.lock();//加锁
// TODO: 2025/2/26 减少库存
boolean result = stockService.reduceStock(pid, num);
if(result){
// System.out.println("减少库存成功!");
return "购买成功!";
}
// System.out.println("减少库存失败!");
lock.unlock();//释放锁
return "购买失败!";
}
再次使用JMeter进行测试,库存量已正常扣减。
问题:JVM锁的失效
- 多例模式
业务对象或锁对象是多例的情况下
原因:业务中一般使用的lock对象锁,lock锁的范围是针对同一个对象里面不同的线程,也就是说,jvm锁是对象锁,对象之间锁不共用,所以多例模式下,jvm锁失效。
测试:将控制器设置为多例模式,启动测试,发现库存量仍然未被正确扣减。
@RestController
@RequestMapping("/stock")
@Tag(name = "库存管理", description = "库存管理相关接口")
@RequiredArgsConstructor
@Scope(value = "prototype") //多例模式,每个请求都创建一个新的实例,singleton默认单例模式
public class StockController {
...
}
- 添加了事务注解
在业务方法上添加了事务注解,导致jvm锁失效。
原因:事务注解会在方法执行前后自动提交或回滚事务,jvm锁是对象锁,对象之间锁不共用,所以jvm锁失效。
测试:将业务方法上添加事务注解,启动测试,发现库存量仍然未被正确扣减。
@GetMapping("/reduceStock")
@Transactional
public synchronized String reduceStock(@RequestParam("pid") Integer pid,
@RequestParam("num") Integer num){
...
}
- 分布式架构下
原因:服务都不一样了,锁和对象自然也不一样(就和第一个情况下的环境一样),jvm锁失效。
测试:将服务部署到不同的机器上,启动测试,发现库存量仍然未被正确扣减。
分布式锁的解决方案
基于MySQL的分布式锁
- 方案一:基于一条SQL语句的悲观锁
update mall_stock set count = count - #{count}
where product_id = #{productId} and count >= #{count}
原因:update ,insert,delete 写操作本身带排他锁,所以不会出现并发问题。
问题:增删改操作是表级锁,并发量大时,会出现性能问题。
mysql悲观锁中使用行级锁
- 锁的查询或者跟新条件必须是索引字段
- 查询或者更新条件必须是具体值,不能是范围值
- 方案二:基于SELECT FOR UPDATE的悲观锁
@Transactional
public Boolean reduceStock(String productId, int count) {
// 查询商品库存
Stock stock = stockMapper.getStockByProductId(productId);
if (stock!= null && stock.getCount() >= count) {
// 减少库存
stock.setCount(stock.getCount() - count);
this.updateById(stock); // 更新库存
return true; // 减库存成功
}
return false;
}
查询的SQL语句:
select * from mall_stock where product_id = #{productId} for update
原因:SELECT FOR UPDATE 语句会对查询到的行加排他锁,其他事务无法对该行进行任何操作。
问题:性能问题,死锁问题。
- 方案三:基于数据库表的乐观锁
乐观锁认为数据的变动不会太频繁。 乐观锁通常是通过在表中增加一个版本(version)或时间戳(timestamp)来实现,其中,版本最为常用。 事务在从数据库中取数据时,会将该数据的版本也取出来(v1),当事务对数据变动完毕想要将其更新到表中时,会将之前取出的版本v1与数据中最新的版本v2相对比,如果v1=v2,那么说明在数据变动期间,没有其他事务对数据进行修改,此时,就允许事务对表中的数据进行修改,并且修改时version会加1,以此来表明数据已被变动。 如果,v1不等于v2,那么说明数据变动期间,数据被其他事务改动了,此时不允许数据更新到表中,一般的处理办法是通知用户让其重新操作。不同于悲观锁,乐观锁通常是由开发者实现的。(CAS机制:Compare And Swap 比较并交换)
给库存表添加version字段:
alter table mall_stock add version int default 0;
业务方法实现:
@Override
public boolean reduceStock(String productId, Integer count) {
// 1,根据productId查询库存
Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getProductId, productId));
Integer version = stock.getVersion();// 原始版本号
if(stock!= null && stock.getCount() >= count){
stock.setCount(stock.getCount() - count);// 跟新库存
stock.setVersion(version+1);//更新版本号
int i = stockMapper.update(stock, new LambdaQueryWrapper<Stock>().eq(Stock::getId, stock.getId())
.eq(Stock::getVersion, version));
if(i<1){
//修改失败,重试
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
reduceStock(productId, count);
}
return true;
}
return false;
}
问题:
1,高并发情况下,性能极低
2,读写分离情况下导致乐观锁不可靠
- 方案四:创建分布式锁表
不管是jvm锁还是mysql锁,为了保证线程的并发安全,都提供了悲观独占排他锁。所以独占排他也是 分布式锁的基本要求。 可以利用唯一键索引不能重复插入的特点实现。设计表如下:
CREATE TABLE `db_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lock_name` varchar(50) NOT NULL COMMENT '锁名',
`class_name` varchar(100) DEFAULT NULL COMMENT '类名',
`method_name` varchar(50) DEFAULT NULL COMMENT '方法名',
`server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',
`thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',
`create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP
COMMENT '获取锁时间',
`desc` varchar(100) DEFAULT NULL COMMENT '描述',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_unique` (`lock_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1332899824461455363 DEFAULT CHARSET=utf8;
思路:
- 线程同时获取锁(insert)
- 获取成功,执行业务逻辑,执行完成释放锁(delete)
- 其他线程等待重试
代码实现:
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
@Autowired
private LockMapper lockMapper;
/**
* 数据库分布式锁
*/
public void checkAndLock() {
// 加锁
Lock lock = new Lock(null, "lock", this.getClass().getName(), new
Date(), null);
try {
this.lockMapper.insert(lock);
} catch (Exception ex) {
// 获取锁失败,则重试
try {
Thread.sleep(50);
this.checkAndLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
// 释放锁
this.lockMapper.deleteById(lock.getId());
}
}
问题:
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。 解决方案:给锁数据库 搭建主备
- 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。 解决方案:只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。 解决方案:记录获取锁的主机信息和线程信息,如果相同线程要获取锁,直接重入。
- 受制于数据库性能,并发能力有限。 解决方案:无法解决。
基于Redis的分布式锁
方案一:基于setnx命令的分布式锁
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
- 多个客户端同时获取锁(setnx)
- 获取成功,执行业务逻辑,执行完成释放锁(del)
- 其他客户端等待重试
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
@Autowired
private LockMapper lockMapper;
@Autowired
private StringRedisTemplate redisTemplate;
public void checkAndLock() {
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock","xxx")){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
// 释放锁
this.redisTemplate.delete("lock");
}
}
原因:如果客户端在获取锁的的时候,业务出现异常或服务器宕机,导致锁一直没有释放,其他客户端就一直获取不到锁,造成死锁。
解决:给锁设置过期时间,自动释放锁。 设置过期时间两种方式:
- 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
- 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)
while (!stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,20,
TimeUnit.SECONDS)){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
原因:如果业务逻辑的执行时间是7s。执行流程如下
- index1业务逻辑没执行完,3秒后锁被自动释放。
- index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
- index3获取到锁,执行业务逻辑
- index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
String uuid = UUID.randomUUID().toString();
// 加锁,获取锁失败重试
while (!stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,20,
TimeUnit.SECONDS)){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
...
if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){
stringRedisTemplate.delete("lock");// 删除锁
}
问题:删除操作缺乏原子性。 场景:
- index1执行删除时,查询到的lock值确实和uuid相等
- index1执行删除前,lock刚好过期时间已到,被redis自动释放
- index2获取了lock
- index1执行删除,此时会把index2的lock删除
解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)
Lua脚本的基本使用
Redis提供了Lua脚本功能,可以实现原子性的操作。
eval命令可以执行lua脚本,lua脚本可以实现复杂的操作。 eval命令的语法如下:
EVAL script numkeys key [key...] arg [arg...]
语法说明:
- script:lua脚本内容
- numkeys:脚本需要操作的key的数量
- key [key...]: 脚本操作的key列表
- arg [arg...]: 脚本需要操作的其他参数
Redis客户端执行Lua脚本示例:
redis-cli> EVAL "local a = 1 return a" 0
(integer) 1
redis-cli> EVAL "local a = 1 local b = 2 return a+b" 0
(integer) 1
redis-cli> EVAL "local a = KEYS[1] local b = ARGV[1] return a+b" 1 2 3
(integer) 5
redis-cli> EVAL "local a = KEYS[1] local b = ARGV[1] if(a>b) then return a else return b end" 1 2 3
(integer) 3
redis-cli> EVAL "return redis.call('setnx',KEYS[1],ARGV[1])" 1 lock "lock value"
(integer) 1
redis-cli> EVAL "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 lock "lock value"
(integer) 1
Lua脚本实现删除锁的原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
java代码实现:
// lua脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
// 执行脚本
stringRedisTemplate.execute(new DefaultRedisScript<String>(script, String.class)
, Arrays.asList("lock"),uuid);
方案二:基于Redisson的分布式锁
Redisson是一个基于Redis的Java客户端,提供了一系列分布式锁的实现。
- 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.24.3</version>
</dependency>
- 配置Redisson客户端
package com.syh.stock.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author shan
* @create 2025/2/28 10:41
*/
@Configuration
public class RedissonConfig {
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int port ;
@Value("${spring.data.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
System.out.println(redisHost+":"+port);
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
- 使用Redisson实现分布式锁
private final RedissonClient redissonClient; //注入Redisson客户端
@Override
public boolean reduceStock4(Integer pid, Integer num) {
boolean result = false;
RLock lock = redissonClient.getLock("lock"); //获取锁
try {
boolean b = lock.tryLock(10, TimeUnit.SECONDS); //尝试获取锁,最多等待10秒
if(b){ //获取锁成功
//执行业务逻辑
//根据商品id查询库存
LambdaQueryWrapper<Stock> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(pid!=null,Stock::getProductId, pid);
Stock stock = this.baseMapper.selectOne(queryWrapper);
//判断库存是否充足
if(stock!=null && stock.getCount()>=num){
//减少库存
stock.setCount(stock.getCount()-num);
this.baseMapper.updateById(stock);
//释放锁
result = true;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//释放锁
if (rLock.isHeldByCurrentThread()) { //判断当前线程是否持有锁
rLock.unlock();
}
}
return result;
}