热Key与BigKey:发现、分析与解决方案

热Key问题 定义:访问频率极高的key,导致单个Redis节点负载过高 危害: 单节点CPU 100% 网络带宽打满 影响其他key访问 Cluster集群数据倾斜 发现热Key 方法1:redis-cli –hotkeys redis-cli --hotkeys # 统计访问频率最高的key 方法2:monitor命令 redis-cli monitor | head -n 100000 | awk '{print $4}' | sort | uniq -c | sort -rn | head -n 10 方法3:代码统计 @Aspect @Component public class RedisMonitorAspect { private ConcurrentHashMap<String, AtomicLong> accessCounter = new ConcurrentHashMap<>(); @Around("execution(* org.springframework.data.redis.core.RedisTemplate.opsFor*(..))") public Object around(ProceedingJoinPoint pjp) throws Throwable { Object result = pjp.proceed(); // 统计key访问次数 if (result != null) { String key = extractKey(pjp); if (key != null) { accessCounter.computeIfAbsent(key, k -> new AtomicLong()).incrementAndGet(); } } return result; } @Scheduled(fixedRate = 60000) // 每分钟 public void reportHotKeys() { List<Map.Entry<String, AtomicLong>> hotKeys = accessCounter.entrySet().stream() .sorted(Map.Entry.<String, AtomicLong>comparingByValue().reversed()) .limit(10) .collect(Collectors.toList()); log.info("热Key TOP 10: {}", hotKeys); // 清空统计 accessCounter.clear(); } } 解决方案 方案1:本地缓存 @Service public class ProductService { @Autowired private RedisTemplate<String, Object> redis; // 本地缓存热点数据 private Cache<String, Product> localCache = Caffeine.newBuilder() .maximumSize(1000) .expireAfterWrite(5, TimeUnit.MINUTES) .build(); public Product getProduct(Long productId) { String key = "product:" + productId; // 1. 本地缓存 Product product = localCache.getIfPresent(key); if (product != null) { return product; } // 2. Redis product = (Product) redis.opsForValue().get(key); if (product != null) { localCache.put(key, product); return product; } // 3. DB product = productMapper.selectById(productId); if (product != null) { redis.opsForValue().set(key, product, 3600, TimeUnit.SECONDS); localCache.put(key, product); } return product; } } 方案2:热Key备份 // 热Key复制多份,随机访问 public Object getHotKey(String key) { // 随机选择一个备份 int index = ThreadLocalRandom.current().nextInt(10); String backupKey = key + ":backup:" + index; Object value = redis.opsForValue().get(backupKey); if (value != null) { return value; } // 备份不存在,查询原key value = redis.opsForValue().get(key); if (value != null) { // 更新备份 redis.opsForValue().set(backupKey, value, 3600, TimeUnit.SECONDS); } return value; } 方案3:读写分离 // 读从节点,写主节点 @Configuration public class RedisConfig { @Bean public LettuceConnectionFactory writeConnectionFactory() { return new LettuceConnectionFactory(new RedisStandaloneConfiguration("master", 6379)); } @Bean public LettuceConnectionFactory readConnectionFactory() { return new LettuceConnectionFactory(new RedisStandaloneConfiguration("slave", 6379)); } } BigKey问题 定义:占用内存过大的key ...

2025-01-21 · maneng

缓存三大问题:穿透、击穿、雪崩的解决方案

缓存穿透(Cache Penetration) 定义:查询不存在的数据,缓存和数据库都没有,每次请求都打到数据库 场景:恶意攻击,大量查询不存在的key 请求key="user:-1" → Redis没有 → DB没有 → 返回null 请求key="user:-2" → Redis没有 → DB没有 → 返回null ...大量请求直接打垮数据库 解决方案1:布隆过滤器 @Service public class UserService { @Autowired private RedissonClient redisson; @Autowired private UserMapper userMapper; private RBloomFilter<Long> userBloomFilter; @PostConstruct public void init() { userBloomFilter = redisson.getBloomFilter("user:bloom"); userBloomFilter.tryInit(10000000L, 0.01); // 加载所有userId List<Long> userIds = userMapper.selectAllUserIds(); userIds.forEach(userBloomFilter::add); } public User getUser(Long userId) { // 1. 布隆过滤器判断 if (!userBloomFilter.contains(userId)) { return null; // 一定不存在 } // 2. 查询Redis String key = "user:" + userId; User user = (User) redis.opsForValue().get(key); if (user != null) { return user; } // 3. 查询数据库 user = userMapper.selectById(userId); if (user != null) { redis.opsForValue().set(key, user, 3600, TimeUnit.SECONDS); } return user; } } 解决方案2:缓存空值 public User getUser(Long userId) { String key = "user:" + userId; // 1. 查询Redis User user = (User) redis.opsForValue().get(key); if (user != null) { if (user.getId() == null) { // 空对象标记 return null; } return user; } // 2. 查询数据库 user = userMapper.selectById(userId); if (user != null) { redis.opsForValue().set(key, user, 3600, TimeUnit.SECONDS); } else { // 缓存空对象,防止穿透 User emptyUser = new User(); redis.opsForValue().set(key, emptyUser, 60, TimeUnit.SECONDS); // 短期缓存 } return user; } 缓存击穿(Cache Breakdown) 定义:热点key过期瞬间,大量请求同时打到数据库 ...

2025-01-21 · maneng

GEO地理位置:附近的人与LBS服务

GEO核心命令 # 添加地理位置 GEOADD key longitude latitude member # 获取位置坐标 GEOPOS key member [member ...] # 计算距离 GEODIST key member1 member2 [unit] # 范围查询 GEORADIUS key longitude latitude radius unit [WITHCOORD] [WITHDIST] [COUNT count] # 以成员为中心查询 GEORADIUSBYMEMBER key member radius unit [WITHCOORD] [WITHDIST] [COUNT count] # 获取GeoHash GEOHASH key member [member ...] 单位:m(米)、km(千米)、mi(英里)、ft(英尺) 实战案例 案例1:附近的人 @Service public class NearbyPeopleService { @Autowired private RedisTemplate<String, String> redis; private static final String GEO_KEY = "user:location"; // 更新用户位置 public void updateLocation(Long userId, double longitude, double latitude) { redis.opsForGeo().add(GEO_KEY, new Point(longitude, latitude), String.valueOf(userId)); } // 查找附近的人(5km内) public List<NearbyUser> findNearby(Long userId, double radius) { // 获取用户位置 List<Point> positions = redis.opsForGeo().position(GEO_KEY, String.valueOf(userId)); if (positions == null || positions.isEmpty()) { return Collections.emptyList(); } Point userPos = positions.get(0); // 查询附近的人 GeoResults<GeoLocation<String>> results = redis.opsForGeo().radius( GEO_KEY, userPos, new Distance(radius, Metrics.KILOMETERS), GeoRadiusCommandArgs.newGeoRadiusArgs() .includeDistance() .sortAscending() .limit(50) ); if (results == null) { return Collections.emptyList(); } List<NearbyUser> nearbyUsers = new ArrayList<>(); for (GeoResult<GeoLocation<String>> result : results) { Long nearbyUserId = Long.valueOf(result.getContent().getName()); if (!nearbyUserId.equals(userId)) { // 排除自己 NearbyUser user = new NearbyUser(); user.setUserId(nearbyUserId); user.setDistance(result.getDistance().getValue()); nearbyUsers.add(user); } } return nearbyUsers; } // 计算两用户间距离 public Double getDistance(Long userId1, Long userId2) { Distance distance = redis.opsForGeo().distance( GEO_KEY, String.valueOf(userId1), String.valueOf(userId2), Metrics.KILOMETERS ); return distance != null ? distance.getValue() : null; } } 案例2:外卖配送范围判断 @Service public class DeliveryService { @Autowired private RedisTemplate<String, String> redis; private static final String STORE_GEO_KEY = "store:location"; private static final double MAX_DELIVERY_DISTANCE = 5.0; // 5km配送范围 // 添加店铺位置 public void addStore(Long storeId, double longitude, double latitude) { redis.opsForGeo().add(STORE_GEO_KEY, new Point(longitude, latitude), String.valueOf(storeId)); } // 查找可配送的店铺 public List<DeliveryStore> findAvailableStores(double longitude, double latitude) { GeoResults<GeoLocation<String>> results = redis.opsForGeo().radius( STORE_GEO_KEY, new Circle(new Point(longitude, latitude), new Distance(MAX_DELIVERY_DISTANCE, Metrics.KILOMETERS)), GeoRadiusCommandArgs.newGeoRadiusArgs() .includeDistance() .includeCoordinates() .sortAscending() ); if (results == null) { return Collections.emptyList(); } return results.getContent().stream() .map(result -> { DeliveryStore store = new DeliveryStore(); store.setStoreId(Long.valueOf(result.getContent().getName())); store.setDistance(result.getDistance().getValue()); store.setLongitude(result.getContent().getPoint().getX()); store.setLatitude(result.getContent().getPoint().getY()); return store; }) .collect(Collectors.toList()); } // 判断是否在配送范围内 public boolean isInDeliveryRange(Long storeId, double longitude, double latitude) { List<Point> positions = redis.opsForGeo().position(STORE_GEO_KEY, String.valueOf(storeId)); if (positions == null || positions.isEmpty()) { return false; } Point storePos = positions.get(0); Distance distance = calculateDistance(storePos, new Point(longitude, latitude)); return distance.getValue() <= MAX_DELIVERY_DISTANCE; } private Distance calculateDistance(Point p1, Point p2) { // Haversine公式计算距离 double lat1 = Math.toRadians(p1.getY()); double lat2 = Math.toRadians(p2.getY()); double lon1 = Math.toRadians(p1.getX()); double lon2 = Math.toRadians(p2.getX()); double dLat = lat2 - lat1; double dLon = lon2 - lon1; double a = Math.sin(dLat / 2) * Math.sin(dLat / 2) + Math.cos(lat1) * Math.cos(lat2) * Math.sin(dLon / 2) * Math.sin(dLon / 2); double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); double distance = 6371 * c; // 地球半径6371km return new Distance(distance, Metrics.KILOMETERS); } } 案例3:打车服务(司机匹配) @Service public class TaxiService { @Autowired private RedisTemplate<String, String> redis; private static final String DRIVER_GEO_KEY = "driver:location"; // 司机上线,更新位置 public void driverOnline(Long driverId, double longitude, double latitude) { redis.opsForGeo().add(DRIVER_GEO_KEY, new Point(longitude, latitude), String.valueOf(driverId)); // 设置过期时间(10分钟,超时下线) redis.expire(DRIVER_GEO_KEY, 10, TimeUnit.MINUTES); } // 司机下线 public void driverOffline(Long driverId) { redis.opsForGeo().remove(DRIVER_GEO_KEY, String.valueOf(driverId)); } // 乘客叫车,匹配最近的司机 public List<Driver> matchNearbyDrivers(double longitude, double latitude, int count) { GeoResults<GeoLocation<String>> results = redis.opsForGeo().radius( DRIVER_GEO_KEY, new Circle(new Point(longitude, latitude), new Distance(3, Metrics.KILOMETERS)), // 3km范围内 GeoRadiusCommandArgs.newGeoRadiusArgs() .includeDistance() .includeCoordinates() .sortAscending() .limit(count) ); if (results == null) { return Collections.emptyList(); } return results.getContent().stream() .map(result -> { Driver driver = new Driver(); driver.setDriverId(Long.valueOf(result.getContent().getName())); driver.setDistance(result.getDistance().getValue()); driver.setLongitude(result.getContent().getPoint().getX()); driver.setLatitude(result.getContent().getPoint().getY()); return driver; }) .collect(Collectors.toList()); } // 定期更新司机位置(心跳) @Scheduled(fixedRate = 30000) // 每30秒 public void updateDriverLocations() { // 从GPS服务获取所有在线司机位置 List<DriverLocation> locations = fetchDriverLocations(); for (DriverLocation location : locations) { driverOnline(location.getDriverId(), location.getLongitude(), location.getLatitude()); } } private List<DriverLocation> fetchDriverLocations() { // 从GPS服务获取位置 return new ArrayList<>(); } } 案例4:充电桩查询 @Service public class ChargingStationService { @Autowired private RedisTemplate<String, String> redis; private static final String STATION_GEO_KEY = "charging:station"; // 添加充电桩 public void addStation(Long stationId, double longitude, double latitude) { redis.opsForGeo().add(STATION_GEO_KEY, new Point(longitude, latitude), String.valueOf(stationId)); } // 查找附近充电桩 public List<ChargingStation> findNearbyStations( double longitude, double latitude, double radiusKm) { GeoResults<GeoLocation<String>> results = redis.opsForGeo().radius( STATION_GEO_KEY, new Circle(new Point(longitude, latitude), new Distance(radiusKm, Metrics.KILOMETERS)), GeoRadiusCommandArgs.newGeoRadiusArgs() .includeDistance() .includeCoordinates() .sortAscending() ); if (results == null) { return Collections.emptyList(); } return results.getContent().stream() .map(result -> { Long stationId = Long.valueOf(result.getContent().getName()); // 从数据库获取充电桩详情 ChargingStation station = getStationDetails(stationId); station.setDistance(result.getDistance().getValue()); return station; }) .collect(Collectors.toList()); } private ChargingStation getStationDetails(Long stationId) { // 从数据库查询 return new ChargingStation(); } } 性能优化 1. 缓存用户位置 // 避免频繁GEO查询 public Point getUserLocation(Long userId) { String cacheKey = "user:pos:" + userId; // 先查缓存 String cached = redis.opsForValue().get(cacheKey); if (cached != null) { String[] parts = cached.split(","); return new Point(Double.parseDouble(parts[0]), Double.parseDouble(parts[1])); } // GEO查询 List<Point> positions = redis.opsForGeo().position(GEO_KEY, String.valueOf(userId)); if (positions != null && !positions.isEmpty()) { Point point = positions.get(0); // 缓存5分钟 redis.opsForValue().set(cacheKey, point.getX() + "," + point.getY(), 5, TimeUnit.MINUTES); return point; } return null; } 2. 批量查询优化 // 批量获取位置 public Map<Long, Point> batchGetLocations(List<Long> userIds) { String[] members = userIds.stream() .map(String::valueOf) .toArray(String[]::new); List<Point> positions = redis.opsForGeo().position(GEO_KEY, members); Map<Long, Point> result = new HashMap<>(); for (int i = 0; i < userIds.size(); i++) { if (positions != null && i < positions.size()) { result.put(userIds.get(i), positions.get(i)); } } return result; } 底层原理 GeoHash编码: ...

2025-01-21 · maneng

限流算法:固定窗口、滑动窗口与令牌桶

限流算法对比 算法 实现难度 精确度 内存占用 适用场景 固定窗口 简单 低 低 粗粒度限流 滑动窗口 中等 高 中 精确限流 漏桶 中等 高 中 流量整形 令牌桶 复杂 高 中 允许突发 1. 固定窗口算法 原理:时间窗口内计数,超过限制则拒绝 @Service public class FixedWindowRateLimiter { @Autowired private RedisTemplate<String, String> redis; // 限流检查 public boolean isAllowed(String key, int limit, int windowSeconds) { String counterKey = key + ":" + (System.currentTimeMillis() / 1000 / windowSeconds); Long current = redis.opsForValue().increment(counterKey); if (current == 1) { redis.expire(counterKey, windowSeconds, TimeUnit.SECONDS); } return current != null && current <= limit; } } // 使用示例 public boolean checkLimit(String userId) { return rateLimiter.isAllowed("api:user:" + userId, 100, 60); // 每分钟100次 } 问题:临界问题 ...

2025-01-21 · maneng

延迟队列实现:基于ZSet的定时任务

延迟队列原理 核心思想:ZSet的score存储执行时间戳 ZADD delay_queue <timestamp> <task_id> 轮询: 1. ZRANGEBYSCORE delay_queue 0 <now> LIMIT 0 100 2. 取出到期任务 3. 执行任务 4. ZREM删除已执行任务 基础实现 @Component public class DelayQueue { @Autowired private RedisTemplate<String, String> redis; private static final String QUEUE_KEY = "delay:queue"; // 添加延迟任务 public void addTask(String taskId, long delaySeconds) { long executeTime = System.currentTimeMillis() + delaySeconds * 1000; redis.opsForZSet().add(QUEUE_KEY, taskId, executeTime); } // 拉取到期任务 public Set<String> pollTasks() { long now = System.currentTimeMillis(); Set<String> tasks = redis.opsForZSet() .rangeByScore(QUEUE_KEY, 0, now, 0, 100); if (tasks != null && !tasks.isEmpty()) { // 删除已拉取的任务 redis.opsForZSet().removeRangeByScore(QUEUE_KEY, 0, now); } return tasks; } // 定时拉取 @Scheduled(fixedRate = 1000) // 每秒执行 public void consumeTasks() { Set<String> tasks = pollTasks(); if (tasks != null) { tasks.forEach(this::executeTask); } } private void executeTask(String taskId) { log.info("执行延迟任务: {}", taskId); // 任务执行逻辑 } } 实战案例 案例1:订单超时自动取消 @Service public class OrderTimeoutService { @Autowired private RedisTemplate<String, String> redis; @Autowired private OrderService orderService; private static final String QUEUE_KEY = "delay:order:timeout"; private static final int TIMEOUT_MINUTES = 30; // 30分钟超时 // 创建订单时,添加超时任务 public void createOrder(String orderNo) { orderService.create(orderNo); // 添加到延迟队列 long executeTime = System.currentTimeMillis() + TIMEOUT_MINUTES * 60 * 1000; redis.opsForZSet().add(QUEUE_KEY, orderNo, executeTime); } // 支付成功时,取消超时任务 public void payOrder(String orderNo) { orderService.pay(orderNo); // 从延迟队列移除 redis.opsForZSet().remove(QUEUE_KEY, orderNo); } // 定时检查超时订单 @Scheduled(fixedRate = 10000) // 每10秒 public void checkTimeout() { long now = System.currentTimeMillis(); Set<String> timeoutOrders = redis.opsForZSet() .rangeByScore(QUEUE_KEY, 0, now, 0, 100); if (timeoutOrders != null) { for (String orderNo : timeoutOrders) { try { // 取消订单 orderService.cancel(orderNo, "超时未支付"); // 从队列移除 redis.opsForZSet().remove(QUEUE_KEY, orderNo); log.info("订单超时已取消: {}", orderNo); } catch (Exception e) { log.error("取消订单失败: {}", orderNo, e); } } } } } 案例2:消息定时发送 @Service public class ScheduledMessageService { @Autowired private RedisTemplate<String, String> redis; private static final String QUEUE_KEY = "delay:message"; // 添加定时消息 public void scheduleMessage(String messageId, String content, LocalDateTime sendTime) { // 存储消息内容 redis.opsForValue().set("message:" + messageId, content, 7, TimeUnit.DAYS); // 添加到延迟队列 long timestamp = sendTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); redis.opsForZSet().add(QUEUE_KEY, messageId, timestamp); } @Scheduled(fixedRate = 5000) // 每5秒 public void sendScheduledMessages() { long now = System.currentTimeMillis(); Set<String> messageIds = redis.opsForZSet() .rangeByScore(QUEUE_KEY, 0, now, 0, 50); if (messageIds != null) { for (String messageId : messageIds) { String content = redis.opsForValue().get("message:" + messageId); if (content != null) { sendMessage(content); redis.delete("message:" + messageId); } redis.opsForZSet().remove(QUEUE_KEY, messageId); } } } private void sendMessage(String content) { log.info("发送消息: {}", content); // 实际发送逻辑 } } 案例3:优惠券过期提醒 @Service public class CouponReminderService { @Autowired private RedisTemplate<String, String> redis; private static final String QUEUE_KEY = "delay:coupon:reminder"; // 发放优惠券时,设置提醒 public void issueCoupon(String couponId, LocalDateTime expireTime) { // 过期前3天提醒 long reminderTime = expireTime.minusDays(3) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); redis.opsForZSet().add(QUEUE_KEY, couponId, reminderTime); } @Scheduled(fixedRate = 3600000) // 每小时 public void sendReminders() { long now = System.currentTimeMillis(); Set<String> couponIds = redis.opsForZSet() .rangeByScore(QUEUE_KEY, 0, now); if (couponIds != null) { couponIds.forEach(couponId -> { sendReminder(couponId); redis.opsForZSet().remove(QUEUE_KEY, couponId); }); } } private void sendReminder(String couponId) { log.info("发送优惠券过期提醒: {}", couponId); // 发送短信/推送通知 } } 可靠性保证 1. 防止重复消费(Lua脚本) private static final String POP_SCRIPT = "local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) " + "if #tasks > 0 then " + " redis.call('ZREM', KEYS[1], unpack(tasks)) " + " return tasks " + "else " + " return {} " + "end"; public List<String> popTasks(int limit) { long now = System.currentTimeMillis(); return redis.execute( RedisScript.of(POP_SCRIPT, List.class), Collections.singletonList(QUEUE_KEY), String.valueOf(now), String.valueOf(limit) ); } 2. 任务执行失败重试 public void consumeTasks() { List<String> tasks = popTasks(100); for (String taskId : tasks) { try { executeTask(taskId); } catch (Exception e) { log.error("任务执行失败: {}", taskId, e); // 重新入队,延迟60秒后重试 long retryTime = System.currentTimeMillis() + 60000; redis.opsForZSet().add(QUEUE_KEY, taskId, retryTime); } } } 3. 限制重试次数 public void executeWithRetry(String taskId) { String retryKey = "retry:" + taskId; Integer retryCount = (Integer) redis.opsForValue().get(retryKey); if (retryCount == null) { retryCount = 0; } if (retryCount >= 3) { log.error("任务重试次数超限: {}", taskId); // 转移到死信队列 redis.opsForList().rightPush("dead_letter_queue", taskId); redis.delete(retryKey); return; } try { executeTask(taskId); redis.delete(retryKey); // 执行成功,清除重试计数 } catch (Exception e) { // 重试计数+1 redis.opsForValue().increment(retryKey); redis.expire(retryKey, 1, TimeUnit.DAYS); // 重新入队 long retryTime = System.currentTimeMillis() + (retryCount + 1) * 60000; redis.opsForZSet().add(QUEUE_KEY, taskId, retryTime); } } 性能优化 1. 批量处理 @Scheduled(fixedRate = 1000) public void consumeTasks() { List<String> tasks = popTasks(100); // 每次最多100个 // 使用线程池并行处理 tasks.forEach(taskId -> executor.submit(() -> executeTask(taskId)) ); } 2. 多实例消费(分片) // 实例1消费shard0 // 实例2消费shard1 @Value("${app.shard.id}") private int shardId; public void addTask(String taskId, long delaySeconds) { int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT; String queueKey = "delay:queue:shard:" + shard; long executeTime = System.currentTimeMillis() + delaySeconds * 1000; redis.opsForZSet().add(queueKey, taskId, executeTime); } @Scheduled(fixedRate = 1000) public void consumeTasks() { String queueKey = "delay:queue:shard:" + shardId; // 消费分片队列 } 与其他方案对比 方案 精度 可靠性 复杂度 适用场景 Redis ZSet 秒级 中 低 轻量级延迟任务 RabbitMQ延迟插件 毫秒级 高 中 消息队列场景 时间轮 毫秒级 中 高 高性能定时任务 ScheduledExecutorService 毫秒级 低 低 单机定时任务 总结 核心优势: ...

2025-01-21 · maneng

布隆过滤器:缓存穿透的终极解决方案

什么是布隆过滤器? 核心功能:快速判断元素是否存在 特点: ✅ 空间效率极高(百万数据仅需1MB) ✅ 查询速度快(O(k),k为hash函数个数) ❌ 存在误判(可能把不存在判断为存在) ❌ 无法删除元素 判断逻辑: if (bloomFilter.contains(key)) { // 可能存在(需要进一步查询确认) } else { // 一定不存在(可以直接返回) } 原理 数据结构:位数组 + 多个hash函数 1. 添加元素"apple" hash1("apple") = 3 → bit[3] = 1 hash2("apple") = 7 → bit[7] = 1 hash3("apple") = 10 → bit[10] = 1 2. 判断元素是否存在 if (bit[3] == 1 && bit[7] == 1 && bit[10] == 1) { return "可能存在"; } else { return "一定不存在"; } 误判原因:多个元素的hash值可能碰撞 ...

2025-01-21 · maneng

分布式锁实战:从SETNX到Redlock

为什么需要分布式锁? 单机锁的局限: // ❌ 单机环境有效 synchronized (this) { // 扣减库存 } // ❌ 分布式环境失效 // 服务器1和服务器2的锁互不影响 分布式锁特性: 互斥性:同一时刻只有一个客户端持有锁 安全性:只有持锁者能释放锁 可用性:高可用,避免死锁 性能:加锁解锁快速 方案演进 V1:基础版(SETNX + EXPIRE) public boolean lock(String key, String value, long expireSeconds) { Boolean success = redis.opsForValue().setIfAbsent(key, value); if (Boolean.TRUE.equals(success)) { redis.expire(key, expireSeconds, TimeUnit.SECONDS); return true; } return false; } // ❌ 问题:SETNX和EXPIRE不是原子操作 // 如果SETNX成功后宕机,锁永不过期 V2:原子操作版(SET NX EX) public boolean lock(String key, String value, long expireSeconds) { Boolean success = redis.opsForValue() .setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } public void unlock(String key) { redis.delete(key); } // ❌ 问题:可能释放别人的锁 // 线程A持锁超时,锁自动释放 // 线程B获得锁 // 线程A执行完,删除了线程B的锁 V3:安全释放版(Lua脚本) public class DistributedLock { @Autowired private RedisTemplate<String, String> redis; private static final String UNLOCK_SCRIPT = "if redis.call('GET', KEYS[1]) == ARGV[1] then " + " return redis.call('DEL', KEYS[1]) " + "else " + " return 0 " + "end"; // 加锁 public boolean lock(String key, String requestId, long expireSeconds) { Boolean success = redis.opsForValue() .setIfAbsent(key, requestId, expireSeconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } // 安全解锁 public boolean unlock(String key, String requestId) { Long result = redis.execute( RedisScript.of(UNLOCK_SCRIPT, Long.class), Collections.singletonList(key), requestId ); return result != null && result == 1; } } // ✅ 优势:原子性校验和删除 // ❌ 问题:业务执行时间超过锁过期时间 V4:自动续期版(看门狗) @Component public class RedisLockWithWatchdog { @Autowired private RedisTemplate<String, String> redis; private static final long DEFAULT_EXPIRE_SECONDS = 30; private static final long RENEW_INTERVAL_SECONDS = 10; private final Map<String, ScheduledFuture<?>> renewTasks = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public boolean lock(String key, String requestId) { Boolean success = redis.opsForValue() .setIfAbsent(key, requestId, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS); if (Boolean.TRUE.equals(success)) { // 启动续期任务 startRenewTask(key, requestId); return true; } return false; } private void startRenewTask(String key, String requestId) { ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> { try { String value = redis.opsForValue().get(key); if (requestId.equals(value)) { redis.expire(key, DEFAULT_EXPIRE_SECONDS, TimeUnit.SECONDS); log.debug("锁续期成功: {}", key); } else { // 锁已被他人持有,停止续期 stopRenewTask(key); } } catch (Exception e) { log.error("锁续期失败", e); } }, RENEW_INTERVAL_SECONDS, RENEW_INTERVAL_SECONDS, TimeUnit.SECONDS); renewTasks.put(key, task); } public boolean unlock(String key, String requestId) { stopRenewTask(key); String script = "if redis.call('GET', KEYS[1]) == ARGV[1] then " + " return redis.call('DEL', KEYS[1]) " + "else " + " return 0 " + "end"; Long result = redis.execute( RedisScript.of(script, Long.class), Collections.singletonList(key), requestId ); return result != null && result == 1; } private void stopRenewTask(String key) { ScheduledFuture<?> task = renewTasks.remove(key); if (task != null) { task.cancel(false); } } } 实战案例 案例1:库存扣减 @Service public class StockService { @Autowired private RedisLockWithWatchdog redisLock; @Autowired private RedisTemplate<String, Object> redis; public boolean deductStock(String productId, int quantity) { String lockKey = "lock:stock:" + productId; String requestId = UUID.randomUUID().toString(); try { // 尝试获取锁(超时3秒) if (!tryLock(lockKey, requestId, 3000)) { return false; // 获取锁失败 } // 检查库存 Integer stock = (Integer) redis.opsForValue().get("stock:" + productId); if (stock == null || stock < quantity) { return false; // 库存不足 } // 扣减库存 redis.opsForValue().decrement("stock:" + productId, quantity); return true; } finally { // 释放锁 redisLock.unlock(lockKey, requestId); } } private boolean tryLock(String key, String requestId, long timeoutMs) { long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < timeoutMs) { if (redisLock.lock(key, requestId)) { return true; } try { Thread.sleep(50); // 等待50ms后重试 } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } } 案例2:订单防重 @Service public class OrderService { @Autowired private RedisLockWithWatchdog redisLock; public String createOrder(OrderRequest request) { String lockKey = "lock:order:" + request.getUserId() + ":" + request.getOrderNo(); String requestId = UUID.randomUUID().toString(); try { if (!redisLock.lock(lockKey, requestId)) { throw new BusinessException("订单正在处理中,请勿重复提交"); } // 创建订单逻辑 Order order = new Order(); order.setOrderNo(request.getOrderNo()); // ... 保存订单 return order.getOrderNo(); } finally { redisLock.unlock(lockKey, requestId); } } } 案例3:定时任务防重 @Component public class ScheduledTaskWithLock { @Autowired private RedisLockWithWatchdog redisLock; @Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行 public void syncData() { String lockKey = "lock:task:sync_data"; String requestId = UUID.randomUUID().toString(); if (!redisLock.lock(lockKey, requestId)) { log.info("其他节点正在执行同步任务,跳过"); return; } try { log.info("开始执行同步任务"); // 执行同步逻辑 doSync(); } finally { redisLock.unlock(lockKey, requestId); } } private void doSync() { // 同步逻辑 } } Redlock算法(多Redis实例) 问题:单Redis实例有单点故障风险 ...

2025-01-21 · maneng

HyperLogLog:基数统计的神器

什么是HyperLogLog? 基数统计:统计集合中不重复元素的个数(Cardinality) 问题:1亿用户UV统计需要多少内存? 方案1:Set存储所有userId 内存:1亿 * 8字节 = 763 MB 方案2:HyperLogLog 内存:12 KB(固定) 误差:0.81% 核心优势: ✅ 固定内存(12KB) ✅ 超高性能 ❌ 有误差(0.81%) ❌ 无法获取具体元素 核心命令 # 添加元素 PFADD key element [element ...] # 获取基数 PFCOUNT key [key ...] # 合并多个HyperLogLog PFMERGE destkey sourcekey [sourcekey ...] 实战案例 案例1:网站UV统计 @Service public class UVStatService { @Autowired private RedisTemplate<String, Object> redis; // 记录用户访问 public void recordVisit(String userId) { String key = "uv:" + LocalDate.now(); redis.opsForHyperLogLog().add(key, userId); redis.expire(key, 90, TimeUnit.DAYS); } // 获取今日UV public Long getTodayUV() { String key = "uv:" + LocalDate.now(); return redis.opsForHyperLogLog().size(key); } // 获取近7天UV public Long getWeekUV() { List<String> keys = new ArrayList<>(); for (int i = 0; i < 7; i++) { keys.add("uv:" + LocalDate.now().minusDays(i)); } return redis.opsForHyperLogLog().size(keys.toArray(new String[0])); } // 获取本月UV public Long getMonthUV() { LocalDate now = LocalDate.now(); List<String> keys = new ArrayList<>(); for (int i = 1; i <= now.getDayOfMonth(); i++) { keys.add("uv:" + now.withDayOfMonth(i)); } return redis.opsForHyperLogLog().size(keys.toArray(new String[0])); } } 案例2:页面独立访客统计 @Service public class PageUVService { @Autowired private RedisTemplate<String, Object> redis; // 记录页面访问 public void recordPageVisit(String pageId, String userId) { String key = "page:uv:" + pageId + ":" + LocalDate.now(); redis.opsForHyperLogLog().add(key, userId); redis.expire(key, 30, TimeUnit.DAYS); } // 获取页面UV public Long getPageUV(String pageId) { String key = "page:uv:" + pageId + ":" + LocalDate.now(); return redis.opsForHyperLogLog().size(key); } // 统计TOP 10热门页面 public Map<String, Long> getTopPages(List<String> pageIds) { Map<String, Long> result = new LinkedHashMap<>(); for (String pageId : pageIds) { Long uv = getPageUV(pageId); result.put(pageId, uv); } return result.entrySet().stream() .sorted(Map.Entry.<String, Long>comparingByValue().reversed()) .limit(10) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new )); } } 案例3:独立IP统计 @Service public class IPStatService { @Autowired private RedisTemplate<String, Object> redis; // 记录IP访问 public void recordIP(String ip) { String hourKey = "ip:hour:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); String dayKey = "ip:day:" + LocalDate.now(); redis.opsForHyperLogLog().add(hourKey, ip); redis.opsForHyperLogLog().add(dayKey, ip); redis.expire(hourKey, 2, TimeUnit.DAYS); redis.expire(dayKey, 90, TimeUnit.DAYS); } // 当前小时独立IP public Long getCurrentHourIP() { String key = "ip:hour:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); return redis.opsForHyperLogLog().size(key); } // 今日独立IP public Long getTodayIP() { String key = "ip:day:" + LocalDate.now(); return redis.opsForHyperLogLog().size(key); } // 近24小时独立IP趋势 public List<Map<String, Object>> get24HourTrend() { List<Map<String, Object>> result = new ArrayList<>(); LocalDateTime now = LocalDateTime.now(); for (int i = 23; i >= 0; i--) { LocalDateTime time = now.minusHours(i); String key = "ip:hour:" + time.format(DateTimeFormatter.ofPattern("yyyyMMddHH")); Long count = redis.opsForHyperLogLog().size(key); Map<String, Object> item = new HashMap<>(); item.put("hour", time.format(DateTimeFormatter.ofPattern("HH:00"))); item.put("count", count); result.add(item); } return result; } } 案例4:APP活跃用户统计 @Service public class DAUService { @Autowired private RedisTemplate<String, Object> redis; // 记录用户活跃 public void recordActive(Long userId) { String key = "dau:" + LocalDate.now(); redis.opsForHyperLogLog().add(key, String.valueOf(userId)); redis.expire(key, 90, TimeUnit.DAYS); } // 今日DAU(Daily Active Users) public Long getTodayDAU() { String key = "dau:" + LocalDate.now(); return redis.opsForHyperLogLog().size(key); } // 近7天DAU public Long getWeekDAU() { List<String> keys = new ArrayList<>(); for (int i = 0; i < 7; i++) { keys.add("dau:" + LocalDate.now().minusDays(i)); } return redis.opsForHyperLogLog().size(keys.toArray(new String[0])); } // 近30天DAU(MAU - Monthly Active Users) public Long getMonthMAU() { List<String> keys = new ArrayList<>(); for (int i = 0; i < 30; i++) { keys.add("dau:" + LocalDate.now().minusDays(i)); } return redis.opsForHyperLogLog().size(keys.toArray(new String[0])); } // 计算留存率 public double getRetentionRate(LocalDate startDate, int days) { String startKey = "dau:" + startDate; String endKey = "dau:" + startDate.plusDays(days); Long startDAU = redis.opsForHyperLogLog().size(startKey); Long endDAU = redis.opsForHyperLogLog().size(endKey); if (startDAU == null || startDAU == 0) { return 0.0; } return (double) endDAU / startDAU; } } 合并统计 # 合并多个HyperLogLog PFMERGE result uv:20250101 uv:20250102 uv:20250103 # 查询合并后的基数 PFCOUNT result Java实现: ...

2025-01-21 · maneng

Bitmap位图:节省内存的统计利器

Bitmap原理 本质:一串二进制位(0和1) 位图:[0][1][1][0][1][0][0][1] 索引: 0 1 2 3 4 5 6 7 1亿个位 = 1亿 bits = 12.5 MB 优势: 极致节省内存(1个用户1bit) 快速统计(位运算) 核心命令 # 设置位 SETBIT key offset value # 获取位 GETBIT key offset # 统计1的个数 BITCOUNT key [start end] # 位运算 BITOP AND|OR|XOR|NOT destkey key [key ...] # 查找第一个0或1 BITPOS key bit [start] [end] 实战案例 案例1:用户签到 @Service public class SignInService { @Autowired private RedisTemplate<String, Object> redis; // 签到 public void signIn(Long userId, LocalDate date) { String key = "sign:" + userId + ":" + date.format(DateTimeFormatter.ofPattern("yyyyMM")); int offset = date.getDayOfMonth() - 1; redis.opsForValue().setBit(key, offset, true); } // 检查是否签到 public boolean isSignedIn(Long userId, LocalDate date) { String key = "sign:" + userId + ":" + date.format(DateTimeFormatter.ofPattern("yyyyMM")); int offset = date.getDayOfMonth() - 1; return Boolean.TRUE.equals(redis.opsForValue().getBit(key, offset)); } // 统计本月签到天数 public Long countSignIn(Long userId, String month) { String key = "sign:" + userId + ":" + month; return redis.execute((RedisCallback<Long>) connection -> connection.bitCount(key.getBytes()) ); } // 连续签到天数 public int getContinuousSignInDays(Long userId) { LocalDate today = LocalDate.now(); String key = "sign:" + userId + ":" + today.format(DateTimeFormatter.ofPattern("yyyyMM")); int days = 0; for (int i = today.getDayOfMonth() - 1; i >= 0; i--) { Boolean signed = redis.opsForValue().getBit(key, i); if (Boolean.TRUE.equals(signed)) { days++; } else { break; } } return days; } } 案例2:用户在线状态 @Service public class OnlineStatusService { @Autowired private RedisTemplate<String, Object> redis; // 设置用户在线 public void setOnline(Long userId) { String key = "online:" + LocalDate.now(); redis.opsForValue().setBit(key, userId, true); redis.expire(key, 2, TimeUnit.DAYS); } // 设置用户离线 public void setOffline(Long userId) { String key = "online:" + LocalDate.now(); redis.opsForValue().setBit(key, userId, false); } // 检查用户是否在线 public boolean isOnline(Long userId) { String key = "online:" + LocalDate.now(); return Boolean.TRUE.equals(redis.opsForValue().getBit(key, userId)); } // 统计今日在线人数 public Long countOnlineUsers() { String key = "online:" + LocalDate.now(); return redis.execute((RedisCallback<Long>) connection -> connection.bitCount(key.getBytes()) ); } } 案例3:用户标签系统 @Service public class UserTagService { @Autowired private RedisTemplate<String, Object> redis; // 标签定义 private static final int TAG_VIP = 0; private static final int TAG_ACTIVE = 1; private static final int TAG_VERIFIED = 2; // 添加标签 public void addTag(Long userId, int tagId) { String key = "user:tags:" + userId; redis.opsForValue().setBit(key, tagId, true); } // 移除标签 public void removeTag(Long userId, int tagId) { String key = "user:tags:" + userId; redis.opsForValue().setBit(key, tagId, false); } // 检查标签 public boolean hasTag(Long userId, int tagId) { String key = "user:tags:" + userId; return Boolean.TRUE.equals(redis.opsForValue().getBit(key, tagId)); } // 查找VIP且活跃的用户 public Set<Long> findVIPActiveUsers(Set<Long> userIds) { Set<Long> result = new HashSet<>(); for (Long userId : userIds) { if (hasTag(userId, TAG_VIP) && hasTag(userId, TAG_ACTIVE)) { result.add(userId); } } return result; } } 案例4:布隆过滤器(简化版) @Service public class SimpleBloomFilter { @Autowired private RedisTemplate<String, Object> redis; private static final String KEY = "bloom_filter"; private static final int BIT_SIZE = 10000000; // 1000万位 // 添加元素 public void add(String element) { int hash1 = Math.abs(element.hashCode()) % BIT_SIZE; int hash2 = Math.abs((element + "salt").hashCode()) % BIT_SIZE; int hash3 = Math.abs((element + "salt2").hashCode()) % BIT_SIZE; redis.opsForValue().setBit(KEY, hash1, true); redis.opsForValue().setBit(KEY, hash2, true); redis.opsForValue().setBit(KEY, hash3, true); } // 检查元素是否存在 public boolean mightExist(String element) { int hash1 = Math.abs(element.hashCode()) % BIT_SIZE; int hash2 = Math.abs((element + "salt").hashCode()) % BIT_SIZE; int hash3 = Math.abs((element + "salt2").hashCode()) % BIT_SIZE; return Boolean.TRUE.equals(redis.opsForValue().getBit(KEY, hash1)) && Boolean.TRUE.equals(redis.opsForValue().getBit(KEY, hash2)) && Boolean.TRUE.equals(redis.opsForValue().getBit(KEY, hash3)); } } 位运算操作 多日期统计 # 连续3天都签到的用户 BITOP AND result sign:202501 sign:202502 sign:202503 BITCOUNT result # 3天内至少签到1次的用户 BITOP OR result sign:202501 sign:202502 sign:202503 BITCOUNT result Java实现: ...

2025-01-21 · maneng

Lua脚本编程:Redis的原子操作利器

为什么需要Lua脚本? 问题:多个Redis命令无法保证原子性 // ❌ 非原子操作(有并发问题) Long stock = redis.get("stock"); if (stock > 0) { redis.decr("stock"); // 可能有多个线程同时执行 return "success"; } Lua脚本优势: ✅ 原子性:脚本执行期间不会插入其他命令 ✅ 减少网络往返:多个命令一次发送 ✅ 复用:脚本可以缓存在服务器 基础语法 执行脚本 # EVAL命令 redis> EVAL "return redis.call('SET', 'key', 'value')" 0 OK # 格式:EVAL script numkeys key [key ...] arg [arg ...] # numkeys: key的数量 # KEYS[1], KEYS[2]: 传入的key # ARGV[1], ARGV[2]: 传入的参数 调用Redis命令 -- redis.call():命令错误会报错 redis.call('SET', 'key', 'value') -- redis.pcall():命令错误会返回错误对象 redis.pcall('SET', 'key', 'value') 实战案例 案例1:库存扣减(秒杀场景) -- stock_deduct.lua local key = KEYS[1] local quantity = tonumber(ARGV[1]) local stock = tonumber(redis.call('GET', key) or '0') if stock >= quantity then redis.call('DECRBY', key, quantity) return 1 -- 成功 else return 0 -- 库存不足 end Java调用: ...

2025-01-21 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...