一、引子:为什么Redis需要五大数据结构?

很多人的疑问:Memcached只有String一种数据结构,Redis为什么需要五种?

核心答案不同的业务场景需要不同的数据结构

1.1 如果只有String会怎样?

假设我们要实现一个排行榜功能,只有String的话:

// ❌ 方案1:用String存储整个排行榜(JSON序列化)
// 问题:每次更新一个用户分数,需要序列化/反序列化整个排行榜
public void updateScore(Long userId, int score) {
    // 1. 读取整个排行榜(反序列化)
    String json = redisTemplate.opsForValue().get("rank:list");
    List<User> rankList = JSON.parseArray(json, User.class);  // 10000个用户

    // 2. 更新一个用户的分数
    for (User user : rankList) {
        if (user.getId().equals(userId)) {
            user.setScore(score);
            break;
        }
    }

    // 3. 重新排序
    rankList.sort((a, b) -> b.getScore() - a.getScore());

    // 4. 写入Redis(序列化)
    String newJson = JSON.toJSONString(rankList);
    redisTemplate.opsForValue().set("rank:list", newJson);
}

// 性能问题:
// - 读取:反序列化10000个用户,耗时100ms
// - 排序:O(NlogN) = 10000*log(10000) ≈ 130000次比较
// - 写入:序列化10000个用户,耗时100ms
// 总耗时:200ms+(单次更新)
// ✅ 方案2:使用Redis ZSet(有序集合)
// 优势:Redis内部维护排序,O(logN)复杂度
public void updateScore(Long userId, int score) {
    redisTemplate.opsForZSet().add("rank:zset", userId.toString(), score);
}

// 性能提升:
// - 写入:O(logN) = log(10000) ≈ 13次比较
// - 总耗时:1ms
// 性能提升:200倍

核心洞察

  • String:适合简单KV存储
  • ZSet:适合排序场景,性能提升200倍

1.2 五大数据结构的业务场景

数据结构核心特性典型场景时间复杂度
String简单KV存储计数器、Session、缓存O(1)
List双向链表消息队列、最新列表、时间线O(1)
Hash字段映射对象存储、购物车O(1)
Set无序集合标签、去重、共同好友O(1)
ZSet有序集合排行榜、延时队列、范围查询O(logN)

二、String:最基础的KV存储

2.1 String的应用场景

场景1:分布式Session

/**
 * 用户登录,生成Session
 */
@PostMapping("/api/login")
public LoginResponse login(@RequestBody LoginRequest request) {
    // 1. 验证用户名密码
    User user = userService.authenticate(request.getUsername(), request.getPassword());
    if (user == null) {
        throw new UnauthorizedException("用户名或密码错误");
    }

    // 2. 生成token
    String token = UUID.randomUUID().toString();

    // 3. 将用户信息存入Redis(Session)
    String sessionKey = "session:" + token;
    UserVO userVO = convertToVO(user);
    redisTemplate.opsForValue().set(sessionKey, userVO, 30, TimeUnit.MINUTES);

    return new LoginResponse(token);
}

/**
 * 验证Session
 */
@GetMapping("/api/user/info")
public UserVO getUserInfo(@RequestHeader("token") String token) {
    String sessionKey = "session:" + token;

    // 从Redis获取Session
    UserVO user = redisTemplate.opsForValue().get(sessionKey);
    if (user == null) {
        throw new UnauthorizedException("未登录或Session已过期");
    }

    return user;
}

性能对比

方案响应时间说明
数据库存储Session50ms每次查询数据库
Redis存储Session1ms内存访问
性能提升50倍

场景2:分布式计数器

/**
 * 文章浏览量计数器
 */
@GetMapping("/api/article/{id}")
public ArticleVO getArticle(@PathVariable Long id) {
    // 1. 查询文章内容
    Article article = articleRepository.findById(id);

    // 2. 增加浏览量(原子操作)
    String viewCountKey = "article:view:" + id;
    Long viewCount = redisTemplate.opsForValue().increment(viewCountKey);

    ArticleVO vo = convertToVO(article);
    vo.setViewCount(viewCount);

    return vo;
}

/**
 * 定时任务:每分钟将浏览量同步到数据库
 */
@Scheduled(fixedDelay = 60000)
public void syncViewCountToDB() {
    // 1. 获取所有文章的浏览量key
    Set<String> keys = redisTemplate.keys("article:view:*");

    // 2. 批量获取浏览量
    List<Long> viewCounts = redisTemplate.opsForValue().multiGet(keys);

    // 3. 批量更新数据库
    for (int i = 0; i < keys.size(); i++) {
        String key = keys.get(i);
        Long articleId = Long.parseLong(key.replace("article:view:", ""));
        Long viewCount = viewCounts.get(i);

        articleRepository.updateViewCount(articleId, viewCount);
    }
}

为什么用Redis而不是直接更新数据库?

假设:文章浏览量QPS = 10000

方案1:直接更新数据库
  UPDATE article SET view_count = view_count + 1 WHERE id = 123;

  问题:
  - 每次更新都写磁盘(10ms)
  - QPS上限:1000/10ms = 100
  - 需要100台数据库才能支撑10000 QPS
  - 成本:100台 × 2000元/月 = 20万元/月

方案2:Redis计数器 + 定时同步
  INCR article:view:123  # 内存操作(0.1ms)

  优势:
  - QPS上限:10万+
  - 1台Redis即可支撑
  - 成本:200元/月

  权衡:
  - 数据库数据有1分钟延迟(可接受)
  - Redis重启会丢失未同步数据(可容忍)

成本对比:20万 vs 200元(1000倍差异)

场景3:分布式锁

/**
 * 秒杀场景:防止超卖
 */
@PostMapping("/api/seckill/{productId}")
public SeckillResult seckill(@PathVariable Long productId, @RequestHeader("token") String token) {
    String lockKey = "lock:seckill:" + productId;

    try {
        // 1. 尝试获取分布式锁(SETNX)
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(
            lockKey,
            token,  // 锁的value设置为token,用于后续释放锁时校验
            10,     // 锁的过期时间(防止死锁)
            TimeUnit.SECONDS
        );

        if (!Boolean.TRUE.equals(locked)) {
            return new SeckillResult(false, "系统繁忙,请稍后再试");
        }

        // 2. 执行秒杀逻辑
        // 2.1 检查库存
        Integer stock = productRepository.getStock(productId);
        if (stock <= 0) {
            return new SeckillResult(false, "商品已售罄");
        }

        // 2.2 扣减库存
        productRepository.decrementStock(productId);

        // 2.3 创建订单
        Order order = orderService.createOrder(productId, getUserId(token));

        return new SeckillResult(true, "秒杀成功", order.getId());

    } finally {
        // 3. 释放锁(使用Lua脚本保证原子性)
        String script =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";

        redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(lockKey),
            token
        );
    }
}

为什么释放锁要用Lua脚本?

// ❌ 错误:非原子操作,可能删除别人的锁
String value = redisTemplate.opsForValue().get(lockKey);
if (token.equals(value)) {
    // 假设这里发生了长时间GC(5秒)
    // 锁已经过期,被其他线程获取
    redisTemplate.delete(lockKey);  // 删除了别人的锁!
}

// ✅ 正确:Lua脚本保证原子性
String script =
    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
    "    return redis.call('del', KEYS[1]) " +
    "else " +
    "    return 0 " +
    "end";

// Lua脚本在Redis中原子执行,不会被打断

2.2 String的底层实现

Redis String的底层实现不是C语言的字符串(char*),而是SDS(Simple Dynamic String)

SDS vs C字符串

// C字符串:以'\0'结尾
char* str = "hello";  // 实际存储:'h','e','l','l','o','\0'

// 问题:
// 1. 获取长度需要遍历:O(N)
// 2. 无法存储二进制数据('\0'是结束符)
// 3. 容易缓冲区溢出(strcat不检查长度)

// Redis SDS结构(简化版)
struct sdshdr {
    int len;        // 字符串长度(已使用)
    int free;       // 剩余空间
    char buf[];     // 字符数组
};

// 优势:
// 1. 获取长度:O(1)
// 2. 可以存储二进制数据
// 3. 杜绝缓冲区溢出(自动扩容)
// 4. 减少内存重分配次数

SDS的三种编码

Redis会根据字符串的内容和长度,选择不同的编码方式:

编码条件说明
int字符串是整数,且范围在long内直接存储整数,节省内存
embstr字符串长度 ≤ 44字节一次内存分配,只读
raw字符串长度 > 44字节两次内存分配,可修改
# 查看String的编码
127.0.0.1:6379> SET count 123
OK
127.0.0.1:6379> OBJECT ENCODING count
"int"  # 整数编码

127.0.0.1:6379> SET short "hello"
OK
127.0.0.1:6379> OBJECT ENCODING short
"embstr"  # embstr编码(长度≤44字节)

127.0.0.1:6379> SET long "this is a very long string ........(超过44字节)"
OK
127.0.0.1:6379> OBJECT ENCODING long
"raw"  # raw编码(长度>44字节)

为什么embstr是44字节?

Redis对象头:16字节
SDS头:3字节
字符串内容:44字节
'\0'结束符:1字节
总计:16 + 3 + 44 + 1 = 64字节

Redis内存分配器(jemalloc)的最小分配单位是64字节
所以44字节是最优选择(最大化利用64字节)

2.3 String常用命令

# 基本操作
SET key value                    # 设置值
GET key                          # 获取值
DEL key                          # 删除key
EXISTS key                       # 判断key是否存在
STRLEN key                       # 获取字符串长度

# 过期时间
SET key value EX 30              # 设置值,30秒过期
SETEX key 30 value               # 同上
TTL key                          # 查看剩余过期时间
EXPIRE key 30                    # 设置过期时间

# 原子操作
INCR key                         # 自增1
INCRBY key 10                    # 自增10
DECR key                         # 自减1
DECRBY key 10                    # 自减10

# 批量操作
MSET key1 value1 key2 value2     # 批量设置
MGET key1 key2                   # 批量获取

# 分布式锁
SET lock_key token NX EX 10      # SETNX + 过期时间(原子操作)

三、List:双向链表

3.1 List的应用场景

场景1:消息队列

/**
 * 生产者:发送消息到队列
 */
public void sendMessage(String message) {
    String queueKey = "queue:task";

    // LPUSH:从左侧插入(头部插入)
    redisTemplate.opsForList().leftPush(queueKey, message);
}

/**
 * 消费者:从队列获取消息
 */
public String consumeMessage() {
    String queueKey = "queue:task";

    // BRPOP:从右侧弹出(尾部弹出),阻塞等待
    // 如果队列为空,会阻塞直到有消息或超时
    List<String> result = redisTemplate.opsForList().rightPop(
        queueKey,
        10,  // 超时时间10秒
        TimeUnit.SECONDS
    );

    if (result == null || result.isEmpty()) {
        return null;
    }

    return result.get(1);  // result[0]是key,result[1]是value
}

/**
 * 多消费者并发消费
 */
@Component
public class MessageConsumer {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Scheduled(fixedDelay = 1000)
    public void consumeTask() {
        while (true) {
            String message = consumeMessage();
            if (message == null) {
                break;  // 队列为空,退出
            }

            // 处理消息
            processMessage(message);
        }
    }

    private void processMessage(String message) {
        log.info("处理消息:{}", message);
        // 业务逻辑
    }
}

List消息队列 vs RabbitMQ/Kafka

维度Redis ListRabbitMQKafka
性能极高(10万QPS)高(1万QPS)极高(100万QPS)
可靠性低(无持久化保证)高(持久化)高(分区+副本)
消息顺序FIFOFIFO分区内FIFO
消费模式单消费者多消费者多消费者组
适用场景轻量级队列企业级消息队列大数据流处理

结论:Redis List适合轻量级、高性能的消息队列;RabbitMQ/Kafka适合企业级、高可靠性的消息队列。

场景2:最新列表(时间线)

/**
 * 用户发布动态
 */
public void publishPost(Long userId, Post post) {
    // 1. 保存动态到数据库
    postRepository.save(post);

    // 2. 将动态ID推送到用户的时间线(List)
    String timelineKey = "timeline:user:" + userId;
    redisTemplate.opsForList().leftPush(timelineKey, post.getId().toString());

    // 3. 只保留最新100条动态(节省内存)
    redisTemplate.opsForList().trim(timelineKey, 0, 99);
}

/**
 * 获取用户时间线(最新动态列表)
 */
public List<PostVO> getTimeline(Long userId, int page, int pageSize) {
    String timelineKey = "timeline:user:" + userId;

    // 1. 从Redis获取动态ID列表(分页)
    int start = page * pageSize;
    int end = start + pageSize - 1;
    List<String> postIds = redisTemplate.opsForList().range(timelineKey, start, end);

    if (postIds == null || postIds.isEmpty()) {
        return Collections.emptyList();
    }

    // 2. 根据ID批量查询动态详情(可以从缓存或数据库)
    List<PostVO> posts = postIds.stream()
        .map(id -> getPostDetail(Long.parseLong(id)))
        .collect(Collectors.toList());

    return posts;
}

为什么用List而不是直接查数据库?

-- 数据库查询:每次都查数据库
SELECT * FROM post
WHERE user_id = 123
ORDER BY created_at DESC
LIMIT 0, 20;

-- 问题:
-- 1. 每次查询都走磁盘IO(10ms)
-- 2. ORDER BY需要排序(慢)
-- 3. 如果有100万条动态,查询会很慢

-- Redis List:
-- 1. 内存操作(1ms)
-- 2. 已经排序(LPUSH保证新动态在头部)
-- 3. 只存储最新100条(节省内存)

-- 性能提升:10倍

场景3:限流(滑动窗口)

/**
 * 限流:每分钟最多100次请求
 */
public boolean isAllowed(String userId) {
    String limitKey = "limit:user:" + userId;
    long now = System.currentTimeMillis();

    // 1. 记录当前请求时间
    redisTemplate.opsForList().leftPush(limitKey, String.valueOf(now));

    // 2. 删除1分钟之前的记录(滑动窗口)
    long oneMinuteAgo = now - 60000;
    redisTemplate.opsForList().trim(limitKey, 0, -1);  // 先保留所有

    // 使用Lua脚本删除过期数据
    String script =
        "local key = KEYS[1] " +
        "local now = tonumber(ARGV[1]) " +
        "local limit = tonumber(ARGV[2]) " +
        "local window = tonumber(ARGV[3]) " +
        "redis.call('LPUSH', key, now) " +
        "redis.call('LTRIM', key, 0, limit - 1) " +
        "local count = redis.call('LLEN', key) " +
        "if count <= limit then " +
        "    redis.call('EXPIRE', key, window) " +
        "    return 1 " +
        "else " +
        "    return 0 " +
        "end";

    Long result = redisTemplate.execute(
        new DefaultRedisScript<>(script, Long.class),
        Collections.singletonList(limitKey),
        String.valueOf(now),
        "100",  // 限制100次
        "60"    // 60秒
    );

    return result != null && result == 1;
}

3.2 List的底层实现

Redis List的底层实现经历了多次演进:

Redis 3.2之前:ziplist(压缩列表) + linkedlist(双向链表)

条件:
- 元素数量 < 512 → ziplist
- 元素数量 ≥ 512 → linkedlist

Redis 3.2之后:quicklist(快速列表)

quicklist = ziplist + linkedlist 的结合

结构:
  head → [ziplist1] ⇄ [ziplist2] ⇄ [ziplist3] ← tail
         (3个元素)    (5个元素)    (2个元素)

优势:
- 节省内存(ziplist压缩)
- 性能好(linkedlist快速插入删除)

ziplist(压缩列表)

ziplist:连续内存块,节省内存

结构:
[zlbytes][zltail][zllen][entry1][entry2][..][zlend]

示例:
ziplist: [15][10][3][hello][world][redis][255]
         ↑   ↑   ↑   ↑
         总字节数 尾偏移 元素数 数据

优势:内存连续,缓存友好
劣势:插入删除需要移动数据(O(N))

linkedlist(双向链表)

typedef struct listNode {
    struct listNode *prev;  // 前驱节点
    struct listNode *next;  // 后继节点
    void *value;            // 节点值
} listNode;

typedef struct list {
    listNode *head;  // 头节点
    listNode *tail;  // 尾节点
    unsigned long len;  // 节点数量
} list;

优势:插入删除O(1)
劣势:内存不连续,指针开销大

3.3 List常用命令

# 插入
LPUSH key value1 value2        # 从左侧插入
RPUSH key value1 value2        # 从右侧插入
LINSERT key BEFORE pivot value # 在pivot前插入

# 删除
LPOP key                       # 从左侧弹出
RPOP key                       # 从右侧弹出
LREM key count value           # 删除count个value
LTRIM key start stop           # 保留[start, stop]范围

# 查询
LRANGE key start stop          # 获取范围内的元素
LINDEX key index               # 获取索引位置的元素
LLEN key                       # 获取列表长度

# 阻塞操作(消息队列)
BLPOP key timeout              # 阻塞弹出(左侧)
BRPOP key timeout              # 阻塞弹出(右侧)

四、Hash:字段映射

4.1 Hash的应用场景

场景1:对象存储

/**
 * 存储用户对象(Hash)
 * 优势:可以单独更新某个字段,无需序列化整个对象
 */
public void saveUser(User user) {
    String userKey = "user:hash:" + user.getId();

    Map<String, String> userMap = new HashMap<>();
    userMap.put("id", user.getId().toString());
    userMap.put("name", user.getName());
    userMap.put("age", user.getAge().toString());
    userMap.put("email", user.getEmail());

    redisTemplate.opsForHash().putAll(userKey, userMap);
    redisTemplate.expire(userKey, 30, TimeUnit.MINUTES);
}

/**
 * 获取用户对象
 */
public User getUser(Long userId) {
    String userKey = "user:hash:" + userId;

    Map<Object, Object> userMap = redisTemplate.opsForHash().entries(userKey);
    if (userMap.isEmpty()) {
        return null;
    }

    User user = new User();
    user.setId(Long.parseLong((String) userMap.get("id")));
    user.setName((String) userMap.get("name"));
    user.setAge(Integer.parseInt((String) userMap.get("age")));
    user.setEmail((String) userMap.get("email"));

    return user;
}

/**
 * 只更新用户的年龄字段
 */
public void updateUserAge(Long userId, int age) {
    String userKey = "user:hash:" + userId;

    // 只更新age字段,无需序列化整个对象
    redisTemplate.opsForHash().put(userKey, "age", String.valueOf(age));
}

Hash vs String对比

// 方案1:String存储对象(JSON序列化)
User user = new User(1L, "Alice", 25, "alice@example.com");
String json = JSON.toJSONString(user);  // {"id":1,"name":"Alice","age":25,"email":"alice@example.com"}
redisTemplate.opsForValue().set("user:1", json);

// 更新age字段:需要反序列化→修改→序列化
String json = redisTemplate.opsForValue().get("user:1");
User user = JSON.parseObject(json, User.class);
user.setAge(26);
String newJson = JSON.toJSONString(user);
redisTemplate.opsForValue().set("user:1", newJson);

// 方案2:Hash存储对象
Map<String, String> userMap = new HashMap<>();
userMap.put("id", "1");
userMap.put("name", "Alice");
userMap.put("age", "25");
userMap.put("email", "alice@example.com");
redisTemplate.opsForHash().putAll("user:1", userMap);

// 更新age字段:直接更新,无需序列化
redisTemplate.opsForHash().put("user:1", "age", "26");

// 性能对比:
// String:反序列化(1ms) + 序列化(1ms) = 2ms
// Hash:直接更新 = 0.1ms
// 性能提升:20倍

场景2:购物车

/**
 * 添加商品到购物车
 */
public void addToCart(Long userId, Long productId, int quantity) {
    String cartKey = "cart:user:" + userId;

    // HINCRBY:如果商品已存在,增加数量;否则设置数量
    redisTemplate.opsForHash().increment(cartKey, productId.toString(), quantity);
}

/**
 * 获取购物车
 */
public Map<Long, Integer> getCart(Long userId) {
    String cartKey = "cart:user:" + userId;

    Map<Object, Object> cart = redisTemplate.opsForHash().entries(cartKey);

    Map<Long, Integer> result = new HashMap<>();
    for (Map.Entry<Object, Object> entry : cart.entrySet()) {
        Long productId = Long.parseLong((String) entry.getKey());
        Integer quantity = Integer.parseInt((String) entry.getValue());
        result.put(productId, quantity);
    }

    return result;
}

/**
 * 删除购物车中的商品
 */
public void removeFromCart(Long userId, Long productId) {
    String cartKey = "cart:user:" + userId;
    redisTemplate.opsForHash().delete(cartKey, productId.toString());
}

/**
 * 清空购物车
 */
public void clearCart(Long userId) {
    String cartKey = "cart:user:" + userId;
    redisTemplate.delete(cartKey);
}

购物车的数据结构对比

方案1:String存储购物车(JSON)
{
  "cart:user:123": "{\"456\":2,\"789\":1}"
}
问题:
- 更新一个商品需要反序列化整个购物车
- 商品很多时,序列化/反序列化开销大

方案2:Hash存储购物车
{
  "cart:user:123": {
    "456": "2",  # 商品456,数量2
    "789": "1"   # 商品789,数量1
  }
}
优势:
- 可以单独更新某个商品的数量
- HINCRBY原子操作,并发安全
- 内存效率高

场景3:统计信息

/**
 * 统计网站访问量(按地区)
 */
public void recordVisit(String region) {
    String statsKey = "stats:visit:region";

    // HINCRBY:原子自增
    redisTemplate.opsForHash().increment(statsKey, region, 1);
}

/**
 * 获取各地区访问量
 */
public Map<String, Long> getVisitStats() {
    String statsKey = "stats:visit:region";

    Map<Object, Object> stats = redisTemplate.opsForHash().entries(statsKey);

    Map<String, Long> result = new HashMap<>();
    for (Map.Entry<Object, Object> entry : stats.entrySet()) {
        String region = (String) entry.getKey();
        Long count = Long.parseLong((String) entry.getValue());
        result.put(region, count);
    }

    return result;
}

4.2 Hash的底层实现

Redis Hash有两种底层实现:

编码1:ziplist(压缩列表)

条件:
- 所有键值对的键和值的字符串长度都 < 64字节
- 键值对数量 < 512

优势:内存占用小(连续内存)
劣势:查找是O(N)

编码2:hashtable(哈希表)

typedef struct dict {
    dictType *type;       // 类型特定函数
    void *privdata;       // 私有数据
    dictht ht[2];         // 两个哈希表(用于渐进式rehash)
    long rehashidx;       // rehash进度(-1表示未rehash)
} dict;

typedef struct dictht {
    dictEntry **table;    // 哈希表数组
    unsigned long size;   // 哈希表大小
    unsigned long sizemask;  // 哈希表大小掩码(size-1)
    unsigned long used;   // 已有节点数量
} dictht;

typedef struct dictEntry {
    void *key;           // 键
    void *val;           // 值
    struct dictEntry *next;  // 下一个节点(链表法解决冲突)
} dictEntry;

优势:查找O(1)
劣势:内存占用大(指针开销)

渐进式rehash

问题:当哈希表需要扩容时,一次性rehash会阻塞Redis

解决:渐进式rehash
1. 分配新哈希表(ht[1])
2. 将rehashidx设置为0
3. 每次对Hash的操作(增删改查),顺带将ht[0].table[rehashidx]的所有键值对rehash到ht[1]
4. rehashidx++
5. 当ht[0]为空时,释放ht[0],将ht[1]设置为ht[0],创建新的空白ht[1]

优势:
- 避免阻塞Redis
- 渐进式完成rehash

4.3 Hash常用命令

# 设置
HSET key field value             # 设置单个字段
HMSET key field1 value1 field2 value2  # 设置多个字段
HSETNX key field value           # 字段不存在时设置

# 获取
HGET key field                   # 获取单个字段
HMGET key field1 field2          # 获取多个字段
HGETALL key                      # 获取所有字段和值
HKEYS key                        # 获取所有字段名
HVALS key                        # 获取所有值

# 删除
HDEL key field1 field2           # 删除字段

# 判断
HEXISTS key field                # 判断字段是否存在
HLEN key                         # 获取字段数量

# 原子操作
HINCRBY key field increment      # 原子自增
HINCRBYFLOAT key field increment # 浮点数自增

五、Set:无序集合

5.1 Set的应用场景

场景1:标签系统

/**
 * 给文章添加标签
 */
public void addTags(Long articleId, String... tags) {
    String tagKey = "article:tags:" + articleId;
    redisTemplate.opsForSet().add(tagKey, tags);
}

/**
 * 获取文章标签
 */
public Set<String> getTags(Long articleId) {
    String tagKey = "article:tags:" + articleId;
    return redisTemplate.opsForSet().members(tagKey);
}

/**
 * 获取两篇文章的共同标签
 */
public Set<String> getCommonTags(Long articleId1, Long articleId2) {
    String tagKey1 = "article:tags:" + articleId1;
    String tagKey2 = "article:tags:" + articleId2;

    // SINTER:交集
    return redisTemplate.opsForSet().intersect(tagKey1, tagKey2);
}

/**
 * 推荐相似文章(基于标签相似度)
 */
public List<Long> recommendSimilarArticles(Long articleId, int limit) {
    String tagKey = "article:tags:" + articleId;
    Set<String> tags = redisTemplate.opsForSet().members(tagKey);

    if (tags == null || tags.isEmpty()) {
        return Collections.emptyList();
    }

    // 统计每篇文章与当前文章的共同标签数
    Map<Long, Integer> similarityMap = new HashMap<>();

    for (String tag : tags) {
        // 获取有这个标签的所有文章
        String articlesByTagKey = "tag:articles:" + tag;
        Set<String> articleIds = redisTemplate.opsForSet().members(articlesByTagKey);

        if (articleIds != null) {
            for (String id : articleIds) {
                Long otherArticleId = Long.parseLong(id);
                if (!otherArticleId.equals(articleId)) {
                    similarityMap.put(otherArticleId, similarityMap.getOrDefault(otherArticleId, 0) + 1);
                }
            }
        }
    }

    // 按相似度排序,返回Top N
    return similarityMap.entrySet().stream()
        .sorted((a, b) -> b.getValue() - a.getValue())
        .limit(limit)
        .map(Map.Entry::getKey)
        .collect(Collectors.toList());
}

场景2:共同好友

/**
 * 添加好友
 */
public void addFriend(Long userId, Long friendId) {
    String friendKey = "user:friends:" + userId;
    redisTemplate.opsForSet().add(friendKey, friendId.toString());
}

/**
 * 获取共同好友
 */
public Set<Long> getCommonFriends(Long userId1, Long userId2) {
    String friendKey1 = "user:friends:" + userId1;
    String friendKey2 = "user:friends:" + userId2;

    // SINTER:交集
    Set<String> commonFriends = redisTemplate.opsForSet().intersect(friendKey1, friendKey2);

    if (commonFriends == null) {
        return Collections.emptySet();
    }

    return commonFriends.stream()
        .map(Long::parseLong)
        .collect(Collectors.toSet());
}

/**
 * 可能认识的人(好友的好友,但不是自己的好友)
 */
public Set<Long> getPeopleYouMayKnow(Long userId, int limit) {
    String friendKey = "user:friends:" + userId;
    Set<String> friends = redisTemplate.opsForSet().members(friendKey);

    if (friends == null || friends.isEmpty()) {
        return Collections.emptySet();
    }

    Set<Long> candidates = new HashSet<>();

    // 遍历每个好友
    for (String friendId : friends) {
        String friendOfFriendKey = "user:friends:" + friendId;
        Set<String> friendsOfFriend = redisTemplate.opsForSet().members(friendOfFriendKey);

        if (friendsOfFriend != null) {
            for (String fofId : friendsOfFriend) {
                Long fof = Long.parseLong(fofId);
                // 不是自己,也不是已经是好友的人
                if (!fof.equals(userId) && !friends.contains(fofId)) {
                    candidates.add(fof);
                }
            }
        }
    }

    return candidates.stream().limit(limit).collect(Collectors.toSet());
}

场景3:抽奖系统

/**
 * 参与抽奖
 */
public void participateInLottery(String lotteryId, Long userId) {
    String participantsKey = "lottery:participants:" + lotteryId;
    redisTemplate.opsForSet().add(participantsKey, userId.toString());
}

/**
 * 随机抽取N个中奖者
 */
public List<Long> drawWinners(String lotteryId, int count) {
    String participantsKey = "lottery:participants:" + lotteryId;

    // SRANDMEMBER:随机获取N个成员(不删除)
    List<String> winners = redisTemplate.opsForSet().randomMembers(participantsKey, count);

    if (winners == null) {
        return Collections.emptyList();
    }

    return winners.stream()
        .map(Long::parseLong)
        .collect(Collectors.toList());
}

/**
 * 抽取中奖者并移除(不能重复中奖)
 */
public List<Long> drawAndRemoveWinners(String lotteryId, int count) {
    String participantsKey = "lottery:participants:" + lotteryId;

    List<Long> winners = new ArrayList<>();

    for (int i = 0; i < count; i++) {
        // SPOP:随机弹出一个成员(删除)
        String winner = redisTemplate.opsForSet().pop(participantsKey);
        if (winner == null) {
            break;  // 参与者不足
        }
        winners.add(Long.parseLong(winner));
    }

    return winners;
}

5.2 Set的底层实现

Redis Set有两种底层实现:

编码1:intset(整数集合)

条件:
- 所有元素都是整数
- 元素数量 < 512

结构:
typedef struct intset {
    uint32_t encoding;  // 编码方式(int16_t、int32_t、int64_t)
    uint32_t length;    // 元素数量
    int8_t contents[];  // 数组(有序)
} intset;

示例:
intset: [2][5][1,3,5,7,9]
        ↑ ↑  ↑
     编码 长度 数据(有序)

优势:
- 内存占用小(连续内存,无指针)
- 有序(二分查找O(logN))

劣势:
- 插入删除需要移动数据(O(N))
- 只能存储整数

编码2:hashtable(哈希表)

条件:
- 元素不全是整数,或数量 ≥ 512

结构:同Hash的hashtable

优势:
- 插入删除查找都是O(1)
- 可以存储任意类型

劣势:
- 内存占用大(指针开销)

5.3 Set常用命令

# 添加/删除
SADD key member1 member2         # 添加成员
SREM key member1 member2         # 删除成员
SPOP key [count]                 # 随机弹出成员
SMOVE source dest member         # 移动成员

# 查询
SMEMBERS key                     # 获取所有成员
SISMEMBER key member             # 判断成员是否存在
SCARD key                        # 获取成员数量
SRANDMEMBER key [count]          # 随机获取成员(不删除)

# 集合运算
SINTER key1 key2                 # 交集
SUNION key1 key2                 # 并集
SDIFF key1 key2                  # 差集(在key1但不在key2)
SINTERSTORE dest key1 key2       # 交集存储到dest

六、ZSet:有序集合

6.1 ZSet的应用场景

场景1:排行榜

/**
 * 更新用户分数
 */
public void updateScore(Long userId, int score) {
    String rankKey = "rank:game:score";
    redisTemplate.opsForZSet().add(rankKey, userId.toString(), score);
}

/**
 * 获取排行榜(Top 10)
 */
public List<RankVO> getTopRank(int topN) {
    String rankKey = "rank:game:score";

    // ZREVRANGE:按分数降序获取(分数从高到低)
    Set<ZSetOperations.TypedTuple<String>> tuples =
        redisTemplate.opsForZSet().reverseRangeWithScores(rankKey, 0, topN - 1);

    if (tuples == null) {
        return Collections.emptyList();
    }

    List<RankVO> result = new ArrayList<>();
    int rank = 1;
    for (ZSetOperations.TypedTuple<String> tuple : tuples) {
        Long userId = Long.parseLong(tuple.getValue());
        Integer score = tuple.getScore().intValue();

        RankVO vo = new RankVO();
        vo.setRank(rank++);
        vo.setUserId(userId);
        vo.setScore(score);

        result.add(vo);
    }

    return result;
}

/**
 * 获取用户排名
 */
public Integer getUserRank(Long userId) {
    String rankKey = "rank:game:score";

    // ZREVRANK:获取用户排名(从0开始,所以要+1)
    Long rank = redisTemplate.opsForZSet().reverseRank(rankKey, userId.toString());

    return rank == null ? null : rank.intValue() + 1;
}

/**
 * 获取用户分数
 */
public Integer getUserScore(Long userId) {
    String rankKey = "rank:game:score";

    // ZSCORE:获取用户分数
    Double score = redisTemplate.opsForZSet().score(rankKey, userId.toString());

    return score == null ? null : score.intValue();
}

场景2:延时队列

/**
 * 添加延时任务
 */
public void addDelayedTask(String taskId, String taskData, long delaySeconds) {
    String queueKey = "queue:delayed";

    // 执行时间 = 当前时间 + 延时时间
    long executeTime = System.currentTimeMillis() + delaySeconds * 1000;

    // 使用执行时间作为score
    redisTemplate.opsForZSet().add(queueKey, taskData, executeTime);
}

/**
 * 消费延时任务(定时任务)
 */
@Scheduled(fixedDelay = 1000)
public void consumeDelayedTasks() {
    String queueKey = "queue:delayed";
    long now = System.currentTimeMillis();

    // ZRANGEBYSCORE:获取score <= now的任务(已到期的任务)
    Set<String> tasks = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now);

    if (tasks == null || tasks.isEmpty()) {
        return;
    }

    for (String task : tasks) {
        // 处理任务
        processTask(task);

        // 删除任务
        redisTemplate.opsForZSet().remove(queueKey, task);
    }
}

延时队列的应用

  • 订单超时自动取消(30分钟未支付)
  • 定时提醒(提前1小时提醒会议)
  • 延时重试(失败后5分钟重试)

场景3:范围查询(时间范围、价格范围)

/**
 * 按时间范围查询文章
 */
public List<Article> getArticlesByTimeRange(long startTime, long endTime) {
    String timelineKey = "articles:timeline";

    // ZRANGEBYSCORE:按分数范围查询
    Set<String> articleIds = redisTemplate.opsForZSet().rangeByScore(
        timelineKey,
        startTime,
        endTime
    );

    if (articleIds == null || articleIds.isEmpty()) {
        return Collections.emptyList();
    }

    return articleIds.stream()
        .map(id -> articleRepository.findById(Long.parseLong(id)))
        .collect(Collectors.toList());
}

/**
 * 按价格范围查询商品
 */
public List<Product> getProductsByPriceRange(double minPrice, double maxPrice) {
    String priceIndexKey = "products:price:index";

    // ZRANGEBYSCORE:按价格范围查询
    Set<String> productIds = redisTemplate.opsForZSet().rangeByScore(
        priceIndexKey,
        minPrice,
        maxPrice
    );

    if (productIds == null || productIds.isEmpty()) {
        return Collections.emptyList();
    }

    return productIds.stream()
        .map(id -> productRepository.findById(Long.parseLong(id)))
        .collect(Collectors.toList());
}

6.2 ZSet的底层实现

Redis ZSet有两种底层实现:

编码1:ziplist(压缩列表)

条件:
- 所有元素的长度 < 64字节
- 元素数量 < 128

结构:
ziplist: [member1][score1][member2][score2][...]

示例:
ziplist: [Alice][95][Bob][87][Charlie][92]

优势:内存占用小
劣势:查找O(N),插入删除O(N)

编码2:skiplist(跳表) + dict(哈希表)

条件:
- 元素长度 ≥ 64字节,或元素数量 ≥ 128

为什么同时使用skiplist和dict?
- skiplist:实现范围查询(O(logN))
- dict:实现O(1)查询member对应的score

结构:
typedef struct zset {
    dict *dict;       // 哈希表(member → score)
    zskiplist *zsl;   // 跳表(按score排序)
} zset;

跳表(skiplist)原理

跳表:多层链表,每层都是有序的

示例:4层跳表
Level 4:  head ───────────────────────────────→ NULL
Level 3:  head ─────────→ 50 ─────────────────→ NULL
Level 2:  head ───→ 20 ──→ 50 ───→ 80 ────────→ NULL
Level 1:  head → 10 → 20 → 30 → 50 → 70 → 80 → 90 → NULL

查找80:
1. 从Level 4开始:head → NULL(跳过)
2. Level 3:head → 50(50 < 80,继续)→ NULL(下降)
3. Level 2:50 → 80(找到!)
4. 总比较次数:3次

如果是链表:需要比较7次

跳表特点:
- 平均查找/插入/删除:O(logN)
- 空间复杂度:O(N)
- 实现简单(相比红黑树)

6.3 ZSet常用命令

# 添加/删除
ZADD key score1 member1 score2 member2  # 添加成员
ZREM key member1 member2                # 删除成员
ZINCRBY key increment member            # 增加score

# 查询
ZRANGE key start stop [WITHSCORES]      # 按分数升序获取
ZREVRANGE key start stop [WITHSCORES]   # 按分数降序获取
ZRANGEBYSCORE key min max               # 按分数范围查询
ZRANK key member                        # 获取排名(升序)
ZREVRANK key member                     # 获取排名(降序)
ZSCORE key member                       # 获取分数
ZCARD key                               # 获取成员数量

# 集合运算
ZUNIONSTORE dest numkeys key1 key2      # 并集
ZINTERSTORE dest numkeys key1 key2      # 交集

七、性能对比与选型建议

7.1 五大数据结构性能对比

数据结构查找插入删除范围查询排序内存占用
StringO(1)O(1)O(1)
ListO(N)O(1)O(N)O(N)
HashO(1)O(1)O(1)
SetO(1)O(1)O(1)
ZSetO(logN)O(logN)O(logN)O(logN)

7.2 数据结构选型决策树

需要排序?
├─ 是 → ZSet(排行榜、延时队列)
└─ 否 → 需要去重?
        ├─ 是 → Set(标签、抽奖)
        └─ 否 → 需要保持顺序?
                ├─ 是 → List(消息队列、时间线)
                └─ 否 → 需要字段映射?
                        ├─ 是 → Hash(对象存储、购物车)
                        └─ 否 → String(简单KV)

7.3 常见场景推荐

场景推荐数据结构理由
Session共享String简单高效
计数器String(INCR)原子操作
分布式锁String(SETNX)原子操作
消息队列List(LPUSH+BRPOP)FIFO
最新列表List保持顺序
对象存储Hash字段独立更新
购物车HashHINCRBY原子操作
标签系统Set去重+集合运算
共同好友Set(SINTER)交集运算
抽奖Set(SPOP)随机弹出
排行榜ZSet自动排序
延时队列ZSet按时间排序
范围查询ZSet(ZRANGEBYSCORE)范围查询

八、最佳实践

8.1 内存优化

1. 使用合适的数据结构

// ❌ 错误:用String存储对象(内存浪费)
User user = new User(1L, "Alice", 25);
String json = JSON.toJSONString(user);  // {"id":1,"name":"Alice","age":25}
redisTemplate.opsForValue().set("user:1", json);  // 45字节

// ✅ 正确:用Hash存储对象(节省内存)
Map<String, String> userMap = new HashMap<>();
userMap.put("id", "1");        // 7字节
userMap.put("name", "Alice");  // 10字节
userMap.put("age", "25");      // 7字节
redisTemplate.opsForHash().putAll("user:1", userMap);  // 24字节

// 节省:45 - 24 = 21字节(47%)

2. 控制集合大小

// ❌ 错误:List无限增长
redisTemplate.opsForList().leftPush("timeline:user:123", postId);

// ✅ 正确:只保留最新100条
redisTemplate.opsForList().leftPush("timeline:user:123", postId);
redisTemplate.opsForList().trim("timeline:user:123", 0, 99);

3. 设置过期时间

// ❌ 错误:不设置过期时间(内存泄漏)
redisTemplate.opsForValue().set("session:" + token, user);

// ✅ 正确:设置过期时间
redisTemplate.opsForValue().set("session:" + token, user, 30, TimeUnit.MINUTES);

8.2 性能优化

1. 批量操作

// ❌ 错误:循环执行单个命令(N次网络往返)
for (int i = 0; i < 1000; i++) {
    redisTemplate.opsForValue().set("key" + i, "value" + i);
}

// ✅ 正确:使用Pipeline(1次网络往返)
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    for (int i = 0; i < 1000; i++) {
        connection.set(("key" + i).getBytes(), ("value" + i).getBytes());
    }
    return null;
});

// 性能提升:100倍

2. 避免大Key

// ❌ 错误:List存储100万个元素(大Key)
for (int i = 0; i < 1000000; i++) {
    redisTemplate.opsForList().rightPush("big:list", "item" + i);
}

// ✅ 正确:拆分成多个小List
for (int i = 0; i < 1000; i++) {
    String listKey = "list:part:" + i;
    for (int j = 0; j < 1000; j++) {
        redisTemplate.opsForList().rightPush(listKey, "item" + (i * 1000 + j));
    }
}

3. 使用合适的命令

// ❌ 错误:使用KEYS命令(阻塞Redis)
Set<String> keys = redisTemplate.keys("user:*");  // 扫描所有key

// ✅ 正确:使用SCAN命令(不阻塞)
ScanOptions options = ScanOptions.scanOptions().match("user:*").count(100).build();
Cursor<byte[]> cursor = redisTemplate.executeWithStickyConnection(
    connection -> connection.scan(options)
);

while (cursor.hasNext()) {
    String key = new String(cursor.next());
    // 处理key
}

8.3 安全建议

1. 避免Key冲突

// ❌ 错误:key太短,容易冲突
redisTemplate.opsForValue().set("123", user);

// ✅ 正确:带业务前缀
redisTemplate.opsForValue().set("user:info:123", user);

2. 设置maxmemory

# redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru

3. 监控慢查询

# 查看慢查询
127.0.0.1:6379> SLOWLOG GET 10

# 配置慢查询阈值(10ms)
slowlog-log-slower-than 10000
slowlog-max-len 128

九、总结

9.1 核心要点回顾

五大数据结构的本质

String:   KV存储,万能但不高效
List:     有序列表,适合队列和时间线
Hash:     字段映射,适合对象存储
Set:      无序集合,适合去重和集合运算
ZSet:     有序集合,适合排序和范围查询

底层实现总结

数据结构小对象编码大对象编码阈值
Stringint/embstrraw44字节
Listziplist → quicklist--
Hashziplisthashtable512个/64字节
Setintsethashtable512个/整数
ZSetziplistskiplist+dict128个/64字节

性能特点

时间复杂度:
String/Hash/Set:O(1)
List:O(N)
ZSet:O(logN)

空间复杂度:
String < Hash < Set < List < ZSet

9.2 下一步学习

本文是Redis系列的第3篇,后续文章将深入讲解:

  1. ✅ Redis第一性原理:为什么我们需要缓存?
  2. ✅ 从HashMap到Redis:分布式缓存的演进
  3. Redis五大数据结构:从场景到实现
  4. ⏳ Redis高可用架构:主从复制、哨兵、集群
  5. ⏳ Redis持久化:RDB与AOF的权衡
  6. ⏳ Redis实战:分布式锁、消息队列、缓存设计

参考资料

  • 《Redis设计与实现》- 黄健宏
  • 《Redis实战》- Josiah L. Carlson
  • Redis官方文档:https://redis.io/documentation
  • Redis命令参考:https://redis.io/commands

本文是"Redis第一性原理"系列的第3篇,共6篇。下一篇将深入讲解《Redis高可用架构:主从复制、哨兵、集群》,敬请期待。