为什么需要分布式锁?

单机锁的局限

// ❌ 单机环境有效
synchronized (this) {
    // 扣减库存
}

// ❌ 分布式环境失效
// 服务器1和服务器2的锁互不影响

分布式锁特性

  • 互斥性:同一时刻只有一个客户端持有锁
  • 安全性:只有持锁者能释放锁
  • 可用性:高可用,避免死锁
  • 性能:加锁解锁快速

方案演进

V1:基础版(SETNX + EXPIRE)

public boolean lock(String key, String value, long expireSeconds) {
    Boolean success = redis.opsForValue().setIfAbsent(key, value);
    if (Boolean.TRUE.equals(success)) {
        redis.expire(key, expireSeconds, TimeUnit.SECONDS);
        return true;
    }
    return false;
}

// ❌ 问题:SETNX和EXPIRE不是原子操作
// 如果SETNX成功后宕机,锁永不过期

V2:原子操作版(SET NX EX)

public boolean lock(String key, String value, long expireSeconds) {
    Boolean success = redis.opsForValue()
        .setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}

public void unlock(String key) {
    redis.delete(key);
}

// ❌ 问题:可能释放别人的锁
// 线程A持锁超时,锁自动释放
// 线程B获得锁
// 线程A执行完,删除了线程B的锁

V3:安全释放版(Lua脚本)

public class DistributedLock {
    @Autowired
    private RedisTemplate<String, String> redis;

    private static final String UNLOCK_SCRIPT =
        "if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "  return redis.call('DEL', KEYS[1]) " +
        "else " +
        "  return 0 " +
        "end";

    // 加锁
    public boolean lock(String key, String requestId, long expireSeconds) {
        Boolean success = redis.opsForValue()
            .setIfAbsent(key, requestId, expireSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    // 安全解锁
    public boolean unlock(String key, String requestId) {
        Long result = redis.execute(
            RedisScript.of(UNLOCK_SCRIPT, Long.class),
            Collections.singletonList(key),
            requestId
        );
        return result != null && result == 1;
    }
}

// ✅ 优势:原子性校验和删除
// ❌ 问题:业务执行时间超过锁过期时间

V4:自动续期版(看门狗)

@Component
public class RedisLockWithWatchdog {
    @Autowired
    private RedisTemplate<String, String> redis;

    private static final long DEFAULT_EXPIRE_SECONDS = 30;
    private static final long RENEW_INTERVAL_SECONDS = 10;

    private final Map<String, ScheduledFuture<?>> renewTasks = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public boolean lock(String key, String requestId) {
        Boolean success = redis.opsForValue()
            .setIfAbsent(key, requestId, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS);

        if (Boolean.TRUE.equals(success)) {
            // 启动续期任务
            startRenewTask(key, requestId);
            return true;
        }
        return false;
    }

    private void startRenewTask(String key, String requestId) {
        ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> {
            try {
                String value = redis.opsForValue().get(key);
                if (requestId.equals(value)) {
                    redis.expire(key, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS);
                    log.debug("锁续期成功: {}", key);
                } else {
                    // 锁已被他人持有,停止续期
                    stopRenewTask(key);
                }
            } catch (Exception e) {
                log.error("锁续期失败", e);
            }
        }, RENEW_INTERVAL_SECONDS, RENEW_INTERVAL_SECONDS, TimeUnit.SECONDS);

        renewTasks.put(key, task);
    }

    public boolean unlock(String key, String requestId) {
        stopRenewTask(key);

        String script =
            "if redis.call('GET', KEYS[1]) == ARGV[1] then " +
            "  return redis.call('DEL', KEYS[1]) " +
            "else " +
            "  return 0 " +
            "end";

        Long result = redis.execute(
            RedisScript.of(script, Long.class),
            Collections.singletonList(key),
            requestId
        );
        return result != null && result == 1;
    }

    private void stopRenewTask(String key) {
        ScheduledFuture<?> task = renewTasks.remove(key);
        if (task != null) {
            task.cancel(false);
        }
    }
}

实战案例

案例1:库存扣减

@Service
public class StockService {
    @Autowired
    private RedisLockWithWatchdog redisLock;
    @Autowired
    private RedisTemplate<String, Object> redis;

    public boolean deductStock(String productId, int quantity) {
        String lockKey = "lock:stock:" + productId;
        String requestId = UUID.randomUUID().toString();

        try {
            // 尝试获取锁(超时3秒)
            if (!tryLock(lockKey, requestId, 3000)) {
                return false;  // 获取锁失败
            }

            // 检查库存
            Integer stock = (Integer) redis.opsForValue().get("stock:" + productId);
            if (stock == null || stock < quantity) {
                return false;  // 库存不足
            }

            // 扣减库存
            redis.opsForValue().decrement("stock:" + productId, quantity);
            return true;

        } finally {
            // 释放锁
            redisLock.unlock(lockKey, requestId);
        }
    }

    private boolean tryLock(String key, String requestId, long timeoutMs) {
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < timeoutMs) {
            if (redisLock.lock(key, requestId)) {
                return true;
            }
            try {
                Thread.sleep(50);  // 等待50ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }
}

案例2:订单防重

@Service
public class OrderService {
    @Autowired
    private RedisLockWithWatchdog redisLock;

    public String createOrder(OrderRequest request) {
        String lockKey = "lock:order:" + request.getUserId() + ":" + request.getOrderNo();
        String requestId = UUID.randomUUID().toString();

        try {
            if (!redisLock.lock(lockKey, requestId)) {
                throw new BusinessException("订单正在处理中,请勿重复提交");
            }

            // 创建订单逻辑
            Order order = new Order();
            order.setOrderNo(request.getOrderNo());
            // ... 保存订单

            return order.getOrderNo();

        } finally {
            redisLock.unlock(lockKey, requestId);
        }
    }
}

案例3:定时任务防重

@Component
public class ScheduledTaskWithLock {
    @Autowired
    private RedisLockWithWatchdog redisLock;

    @Scheduled(cron = "0 */5 * * * ?")  // 每5分钟执行
    public void syncData() {
        String lockKey = "lock:task:sync_data";
        String requestId = UUID.randomUUID().toString();

        if (!redisLock.lock(lockKey, requestId)) {
            log.info("其他节点正在执行同步任务,跳过");
            return;
        }

        try {
            log.info("开始执行同步任务");
            // 执行同步逻辑
            doSync();
        } finally {
            redisLock.unlock(lockKey, requestId);
        }
    }

    private void doSync() {
        // 同步逻辑
    }
}

Redlock算法(多Redis实例)

问题:单Redis实例有单点故障风险

Redlock方案

部署5个独立的Redis实例(不是主从)

加锁流程:
1. 获取当前时间戳
2. 依次向5个实例加锁
3. 如果>=3个实例加锁成功,且总耗时<锁过期时间,则成功
4. 否则,向所有实例释放锁

解锁流程:
向所有5个实例释放锁

Java实现(Redisson):

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379");
        return Redisson.create(config);
    }
}

@Service
public class RedissonLockService {
    @Autowired
    private RedissonClient redisson;

    public void executeWithLock(String lockKey, Runnable task) {
        RLock lock = redisson.getLock(lockKey);
        try {
            // 尝试加锁,最多等待100秒,锁自动释放时间30秒
            if (lock.tryLock(100, 30, TimeUnit.SECONDS)) {
                task.run();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

最佳实践

1. 锁的粒度要细

// ❌ 不好:粒度太粗
String lockKey = "lock:order";  // 所有订单共用一把锁

// ✅ 好:细粒度锁
String lockKey = "lock:order:" + userId;  // 每个用户一把锁

2. 设置合理的超时时间

// 根据业务执行时间设置
// 快速操作:5-10秒
// 慢操作:30-60秒
// 加上看门狗自动续期

3. 降级方案

public boolean deductStock(String productId, int quantity) {
    String lockKey = "lock:stock:" + productId;
    String requestId = UUID.randomUUID().toString();

    try {
        if (!tryLock(lockKey, requestId, 3000)) {
            // 降级方案:使用Lua脚本原子操作
            return deductStockWithLua(productId, quantity);
        }

        // 正常流程
        return doDeductStock(productId, quantity);

    } finally {
        unlock(lockKey, requestId);
    }
}

4. 监控告警

@Aspect
@Component
public class LockMonitorAspect {
    @Around("@annotation(DistributedLock)")
    public Object around(ProceedingJoinPoint pjp, DistributedLock lock) throws Throwable {
        long startTime = System.currentTimeMillis();
        String lockKey = lock.key();

        try {
            Object result = pjp.proceed();
            long duration = System.currentTimeMillis() - startTime;

            // 监控:锁持有时间过长告警
            if (duration > 10000) {
                log.warn("锁持有时间过长: key={}, duration={}ms", lockKey, duration);
            }
            return result;
        } catch (Exception e) {
            log.error("加锁执行异常: key={}", lockKey, e);
            throw e;
        }
    }
}

常见问题

1. 锁超时释放

问题:业务还没执行完,锁就过期了

解决

  • 使用看门狗自动续期
  • 设置足够长的超时时间
  • 优化业务逻辑,减少执行时间

2. 死锁

问题:持锁进程崩溃,锁永不释放

解决

  • 设置锁过期时间(必须)
  • 避免过长的业务逻辑

3. 锁重入

问题:同一线程多次获取同一把锁

解决:使用Redisson的可重入锁

RLock lock = redisson.getLock("mylock");
lock.lock();
try {
    // 业务逻辑
    doSomething();  // 内部可能再次获取同一把锁
} finally {
    lock.unlock();
}

总结

方案选择

  • 简单场景:SETNX + Lua脚本释放
  • 复杂场景:自动续期(看门狗)
  • 高可用要求:Redlock(多实例)
  • 生产推荐:Redisson(功能完善)

核心要点

  • 原子性加锁(SET NX EX)
  • 安全释放(Lua脚本校验)
  • 自动续期(避免超时)
  • 细粒度锁(提高并发)
  • 降级方案(Redis不可用)

典型场景

  • 库存扣减、秒杀
  • 订单防重提交
  • 定时任务防重
  • 资源独占访问