热Key问题

定义:访问频率极高的key,导致单个Redis节点负载过高

危害

  • 单节点CPU 100%
  • 网络带宽打满
  • 影响其他key访问
  • Cluster集群数据倾斜

发现热Key

方法1:redis-cli –hotkeys

redis-cli --hotkeys
# 统计访问频率最高的key

方法2:monitor命令

redis-cli monitor | head -n 100000 | awk '{print $4}' | sort | uniq -c | sort -rn | head -n 10

方法3:代码统计

@Aspect
@Component
public class RedisMonitorAspect {
    private ConcurrentHashMap<String, AtomicLong> accessCounter = new ConcurrentHashMap<>();

    @Around("execution(* org.springframework.data.redis.core.RedisTemplate.opsFor*(..))")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        Object result = pjp.proceed();

        // 统计key访问次数
        if (result != null) {
            String key = extractKey(pjp);
            if (key != null) {
                accessCounter.computeIfAbsent(key, k -> new AtomicLong()).incrementAndGet();
            }
        }

        return result;
    }

    @Scheduled(fixedRate = 60000)  // 每分钟
    public void reportHotKeys() {
        List<Map.Entry<String, AtomicLong>> hotKeys = accessCounter.entrySet().stream()
            .sorted(Map.Entry.<String, AtomicLong>comparingByValue().reversed())
            .limit(10)
            .collect(Collectors.toList());

        log.info("热Key TOP 10: {}", hotKeys);

        // 清空统计
        accessCounter.clear();
    }
}

解决方案

方案1:本地缓存

@Service
public class ProductService {
    @Autowired
    private RedisTemplate<String, Object> redis;

    // 本地缓存热点数据
    private Cache<String, Product> localCache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();

    public Product getProduct(Long productId) {
        String key = "product:" + productId;

        // 1. 本地缓存
        Product product = localCache.getIfPresent(key);
        if (product != null) {
            return product;
        }

        // 2. Redis
        product = (Product) redis.opsForValue().get(key);
        if (product != null) {
            localCache.put(key, product);
            return product;
        }

        // 3. DB
        product = productMapper.selectById(productId);
        if (product != null) {
            redis.opsForValue().set(key, product, 3600, TimeUnit.SECONDS);
            localCache.put(key, product);
        }

        return product;
    }
}

方案2:热Key备份

// 热Key复制多份,随机访问
public Object getHotKey(String key) {
    // 随机选择一个备份
    int index = ThreadLocalRandom.current().nextInt(10);
    String backupKey = key + ":backup:" + index;

    Object value = redis.opsForValue().get(backupKey);
    if (value != null) {
        return value;
    }

    // 备份不存在,查询原key
    value = redis.opsForValue().get(key);
    if (value != null) {
        // 更新备份
        redis.opsForValue().set(backupKey, value, 3600, TimeUnit.SECONDS);
    }

    return value;
}

方案3:读写分离

// 读从节点,写主节点
@Configuration
public class RedisConfig {
    @Bean
    public LettuceConnectionFactory writeConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("master", 6379));
    }

    @Bean
    public LettuceConnectionFactory readConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("slave", 6379));
    }
}

BigKey问题

定义:占用内存过大的key

  • String > 10KB
  • List/Set/ZSet > 1万元素
  • Hash > 1万字段

危害

  • 内存占用高
  • 网络阻塞(大value传输)
  • 删除慢(DEL命令阻塞)
  • 主从同步慢

发现BigKey

方法1:redis-cli –bigkeys

redis-cli --bigkeys

# 输出示例
[00.00%] Biggest string found so far 'user:1001' with 15360 bytes
[00.00%] Biggest list   found so far 'list:tasks' with 50000 items
[00.00%] Biggest hash   found so far 'hash:user:info' with 10000 fields

方法2:MEMORY USAGE命令

MEMORY USAGE key

# 示例
redis> MEMORY USAGE mykey
(integer) 50000  # 字节

方法3:扫描统计

@Service
public class BigKeyScanner {
    @Autowired
    private RedisTemplate<String, String> redis;

    public List<BigKeyInfo> scan() {
        List<BigKeyInfo> bigKeys = new ArrayList<>();
        ScanOptions options = ScanOptions.scanOptions().count(100).build();

        redis.execute((RedisCallback<Object>) connection -> {
            Cursor<byte[]> cursor = connection.scan(options);

            while (cursor.hasNext()) {
                String key = new String(cursor.next());
                long size = getKeySize(key);

                if (size > 10240) {  // > 10KB
                    BigKeyInfo info = new BigKeyInfo();
                    info.setKey(key);
                    info.setSize(size);
                    bigKeys.add(info);
                }
            }

            return null;
        });

        return bigKeys.stream()
            .sorted(Comparator.comparingLong(BigKeyInfo::getSize).reversed())
            .limit(100)
            .collect(Collectors.toList());
    }

    private long getKeySize(String key) {
        return redis.execute((RedisCallback<Long>) connection ->
            connection.memoryUsage(key.getBytes())
        );
    }
}

解决方案

方案1:拆分BigKey

// ❌ 不好:单个大Hash
HSET user:1001 field1 value1 field2 value2 ... field10000 value10000

// ✅ 好:拆分为多个小Hash
@Service
public class UserService {
    private static final int SHARD_COUNT = 10;

    public void setUserField(Long userId, String field, String value) {
        int shard = Math.abs(field.hashCode()) % SHARD_COUNT;
        String key = "user:" + userId + ":shard:" + shard;
        redis.opsForHash().put(key, field, value);
    }

    public String getUserField(Long userId, String field) {
        int shard = Math.abs(field.hashCode()) % SHARD_COUNT;
        String key = "user:" + userId + ":shard:" + shard;
        return (String) redis.opsForHash().get(key, field);
    }
}

方案2:压缩value

@Service
public class CompressService {
    public void setCompressed(String key, Object value) throws IOException {
        // 序列化
        byte[] data = SerializationUtils.serialize(value);

        // Gzip压缩
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
            gzip.write(data);
        }

        byte[] compressed = bos.toByteArray();
        redis.opsForValue().set(key.getBytes(), compressed);

        log.info("压缩率: {}%", (1 - compressed.length * 1.0 / data.length) * 100);
    }

    public Object getCompressed(String key) throws IOException {
        byte[] compressed = redis.opsForValue().get(key.getBytes());
        if (compressed == null) {
            return null;
        }

        // Gzip解压
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (GZIPInputStream gzip = new GZIPInputStream(bis)) {
            byte[] buffer = new byte[1024];
            int len;
            while ((len = gzip.read(buffer)) > 0) {
                bos.write(buffer, 0, len);
            }
        }

        return SerializationUtils.deserialize(bos.toByteArray());
    }
}

方案3:异步删除BigKey

# Redis 4.0+
UNLINK key  # 异步删除,不阻塞

# 或配置lazyfree
lazyfree-lazy-eviction yes
lazyfree-lazy-expire yes
lazyfree-lazy-server-del yes
// Java代码
public void deleteBigKey(String key) {
    redis.unlink(key);  // 异步删除
}

方案4:渐进式删除

// 对于大List/Set/ZSet,分批删除
public void deleteBigList(String key) {
    long size = redis.opsForList().size(key);
    long batchSize = 100;

    while (size > 0) {
        // 每次删除100个元素
        redis.opsForList().trim(key, 0, size - batchSize - 1);
        size = redis.opsForList().size(key);
        Thread.sleep(10);  // 避免阻塞
    }

    // 最后删除key
    redis.delete(key);
}

监控与告警

@Component
public class RedisHealthMonitor {
    @Autowired
    private RedisTemplate<String, String> redis;

    @Scheduled(fixedRate = 60000)  // 每分钟
    public void monitor() {
        // 1. 内存使用
        Properties info = redis.execute((RedisCallback<Properties>) connection ->
            connection.info("memory")
        );

        long usedMemory = Long.parseLong(info.getProperty("used_memory"));
        long maxMemory = Long.parseLong(info.getProperty("maxmemory"));

        if (usedMemory > maxMemory * 0.9) {
            log.warn("Redis内存使用超过90%: {}MB / {}MB",
                usedMemory / 1024 / 1024, maxMemory / 1024 / 1024);
            // 发送告警
        }

        // 2. BigKey告警
        List<BigKeyInfo> bigKeys = scanBigKeys();
        if (!bigKeys.isEmpty()) {
            log.warn("发现BigKey: {}", bigKeys);
            // 发送告警
        }

        // 3. 慢查询
        List<Object> slowlogs = redis.execute((RedisCallback<List<Object>>) connection ->
            connection.slowlogGet(10)
        );

        if (slowlogs != null && !slowlogs.isEmpty()) {
            log.warn("慢查询: {}", slowlogs);
        }
    }

    private List<BigKeyInfo> scanBigKeys() {
        // 实现BigKey扫描
        return new ArrayList<>();
    }
}

最佳实践

避免产生BigKey

  1. 设计合理的数据结构
// ❌ 不好
HSET user:info field1 value1 ... field10000 value10000

// ✅ 好:按类型拆分
HSET user:1001:basic name "Alice" age "25"
HSET user:1001:address city "Beijing" street "..."
  1. 限制集合大小
public void addToList(String key, String value) {
    redis.opsForList().rightPush(key, value);

    // 限制List大小
    Long size = redis.opsForList().size(key);
    if (size > 1000) {
        redis.opsForList().trim(key, -1000, -1);  // 只保留最后1000个
    }
}
  1. 定期清理过期数据
@Scheduled(cron = "0 0 2 * * ?")  // 每天凌晨2点
public void cleanExpiredData() {
    // 清理超过30天的数据
    long threshold = System.currentTimeMillis() - 30L * 24 * 3600 * 1000;
    redis.opsForZSet().removeRangeByScore("timeline", 0, threshold);
}

总结

热Key

  • 发现:monitor、–hotkeys、代码统计
  • 解决:本地缓存、热Key备份、读写分离

BigKey

  • 发现:–bigkeys、MEMORY USAGE、扫描
  • 解决:拆分、压缩、异步删除

预防

  • 合理设计数据结构
  • 限制集合大小
  • 定期清理数据
  • 监控告警