引言

恭喜你!这是第一阶段的最后一篇文章。

前面我们学习了Redis的所有基础知识:

  • ✅ 5大数据类型
  • ✅ 过期和淘汰
  • ✅ 持久化
  • ✅ 事务

今天我们学习一个性能优化神器Pipeline(管道)

一个真实的案例

需求:批量获取10000个用户信息
方式1:循环GET,耗时10秒
方式2:MGET,耗时0.5秒
方式3:Pipeline,耗时0.3秒

性能提升:30倍!

一、RTT延迟问题

1.1 什么是RTT?

RTT(Round-Trip Time):往返时间,从客户端发送请求到收到响应的时间。

客户端                  Redis服务器
  |                          |
  |  ----发送命令---->        |  1ms
  |                          |
  |  <----返回结果----        |  1ms
  |                          |
RTT = 2ms

实际测量

# 本地Redis
$ redis-cli --latency
min: 0.05, max: 2, avg: 0.12 (ms)

# 同机房Redis
RTT ≈ 0.5-1ms

# 跨机房Redis
RTT ≈ 5-10ms

# 跨地域Redis
RTT ≈ 50-100ms

1.2 RTT的影响

单条命令的性能

// 1次GET操作
redis.opsForValue().get("key");
// 耗时:RTT + Redis执行时间
// = 1ms + 0.01ms ≈ 1ms

批量操作的性能问题

// 1000次GET操作
for (int i = 0; i < 1000; i++) {
    redis.opsForValue().get("key" + i);
}
// 耗时:1000 × (1ms + 0.01ms) ≈ 1000ms = 1秒

// 瓶颈不是Redis慢,而是网络RTT!

性能对比

操作RTTRedis执行总耗时
1次GET1ms0.01ms1.01ms
1000次GET1000ms10ms1010ms
MGET 1000个1ms1ms2ms

结论:批量操作的瓶颈是网络往返,而不是Redis本身!

二、Pipeline的原理

2.1 什么是Pipeline?

Pipeline是批量发送命令的机制:

普通方式(3次网络往返):
客户端 → SET key1   → Redis
客户端 ← OK        ← Redis
客户端 → SET key2   → Redis
客户端 ← OK        ← Redis
客户端 → SET key3   → Redis
客户端 ← OK        ← Redis

Pipeline方式(1次网络往返):
客户端 → SET key1 + SET key2 + SET key3 → Redis
客户端 ← OK + OK + OK ← Redis

工作流程

1. 客户端:将多条命令打包
2. 一次性发送到Redis服务器
3. Redis:依次执行所有命令
4. 一次性返回所有结果

2.2 Pipeline vs 普通方式

普通方式

for (int i = 0; i < 1000; i++) {
    redis.opsForValue().set("key" + i, "value" + i);
}
// 1000次网络往返
// 耗时:1000 × 1ms = 1000ms

Pipeline方式

Pipeline pipeline = redis.pipelined();
for (int i = 0; i < 1000; i++) {
    pipeline.set("key" + i, "value" + i);
}
pipeline.sync();  // 一次性发送
// 1次网络往返
// 耗时:1ms + 10ms ≈ 11ms

// 性能提升:1000ms / 11ms ≈ 90倍!

三、Pipeline使用方法

3.1 Jedis示例

// 创建Pipeline
Jedis jedis = new Jedis("localhost", 6379);
Pipeline pipeline = jedis.pipelined();

// 添加命令
for (int i = 0; i < 1000; i++) {
    pipeline.set("key" + i, "value" + i);
    pipeline.incr("counter");
}

// 执行并获取结果
List<Object> results = pipeline.syncAndReturnAll();

// 关闭
jedis.close();

3.2 Lettuce示例

// Lettuce自动使用Pipeline(异步模式)
RedisAsyncCommands<String, String> async = connection.async();

// 批量操作
List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    futures.add(async.set("key" + i, "value" + i));
}

// 等待所有操作完成
async.flushCommands();  // 立即发送
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));

3.3 Spring RedisTemplate示例

@Autowired
private RedisTemplate<String, String> redisTemplate;

public void batchSet(Map<String, String> data) {
    redisTemplate.executePipelined(new RedisCallback<Object>() {
        @Override
        public Object doInRedis(RedisConnection connection) {
            for (Map.Entry<String, String> entry : data.entrySet()) {
                connection.set(
                    entry.getKey().getBytes(),
                    entry.getValue().getBytes()
                );
            }
            return null;  // Pipeline不需要返回值
        }
    });
}

四、实战场景

场景1:批量查询用户信息

@Service
public class UserBatchService {

    // ❌ 慢方式:循环查询
    public List<User> getUsersSlow(List<Long> userIds) {
        List<User> users = new ArrayList<>();
        for (Long userId : userIds) {
            String key = "user:" + userId;
            User user = (User) redis.opsForValue().get(key);
            if (user != null) {
                users.add(user);
            }
        }
        return users;
    }

    // ✅ 快方式:Pipeline批量查询
    public List<User> getUsersFast(List<Long> userIds) {
        List<Object> results = redisTemplate.executePipelined(
            new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) {
                    for (Long userId : userIds) {
                        String key = "user:" + userId;
                        connection.get(key.getBytes());
                    }
                    return null;
                }
            }
        );

        List<User> users = new ArrayList<>();
        for (Object result : results) {
            if (result != null) {
                User user = JSON.parseObject((String) result, User.class);
                users.add(user);
            }
        }
        return users;
    }
}

性能对比

查询1000个用户:
慢方式:1000次网络往返 = 1000ms
快方式:1次网络往返 = 2ms
提升:500倍

场景2:批量统计数据

@Service
public class StatisticsService {

    // 批量获取多个文章的统计数据
    public Map<Long, ArticleStats> batchGetArticleStats(List<Long> articleIds) {
        List<Object> results = redisTemplate.executePipelined(
            new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) {
                    for (Long articleId : articleIds) {
                        String pvKey = "article:pv:" + articleId;
                        String likeKey = "article:like:" + articleId;
                        String commentKey = "article:comment:" + articleId;

                        connection.get(pvKey.getBytes());
                        connection.get(likeKey.getBytes());
                        connection.get(commentKey.getBytes());
                    }
                    return null;
                }
            }
        );

        Map<Long, ArticleStats> statsMap = new HashMap<>();
        int idx = 0;
        for (Long articleId : articleIds) {
            ArticleStats stats = new ArticleStats();
            stats.setPv(parseInt(results.get(idx++)));
            stats.setLikes(parseInt(results.get(idx++)));
            stats.setComments(parseInt(results.get(idx++)));
            statsMap.put(articleId, stats);
        }

        return statsMap;
    }

    private int parseInt(Object value) {
        return value != null ? Integer.parseInt((String) value) : 0;
    }
}

场景3:批量删除缓存

@Service
public class CacheInvalidationService {

    // 批量删除用户相关的所有缓存
    public void invalidateUserCache(List<Long> userIds) {
        redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) {
                for (Long userId : userIds) {
                    // 删除用户信息缓存
                    connection.del(("user:" + userId).getBytes());

                    // 删除用户权限缓存
                    connection.del(("user:permissions:" + userId).getBytes());

                    // 删除用户会话
                    connection.del(("session:user:" + userId).getBytes());
                }
                return null;
            }
        });
    }
}

场景4:批量设置过期时间

// 批量刷新Session过期时间
public void refreshSessions(List<String> sessionIds) {
    redisTemplate.executePipelined(new RedisCallback<Object>() {
        @Override
        public Object doInRedis(RedisConnection connection) {
            for (String sessionId : sessionIds) {
                String key = "session:" + sessionId;
                connection.expire(key.getBytes(), 1800);  // 30分钟
            }
            return null;
        }
    });
}

五、Pipeline vs 其他批量操作

5.1 Pipeline vs 事务

维度Pipeline事务(MULTI/EXEC)
原子性❌ 无⚠️ 有限
顺序保证✅ 有✅ 有
失败回滚❌ 无❌ 无
性能✅ 高✅ 高
使用场景批量操作需要原子性

可以结合使用

Pipeline pipeline = jedis.pipelined();

// 在Pipeline中使用事务
pipeline.multi();
pipeline.set("key1", "value1");
pipeline.set("key2", "value2");
pipeline.exec();

pipeline.sync();

5.2 Pipeline vs 原生批量命令

场景Pipeline原生命令
批量SETPipelineMSET(更快)
批量GETPipelineMGET(更快)
混合操作Pipeline(唯一选择)-
不同数据类型Pipeline(唯一选择)-

推荐

  • 同类型操作:优先用原生批量命令(MSET/MGET/HMSET/ZADD…)
  • 混合操作:用Pipeline

5.3 Pipeline vs Lua脚本

维度PipelineLua脚本
原子性❌ 无✅ 有
条件判断❌ 无✅ 有
网络往返1次1次
灵活性
复杂度

选择指南

  • 简单批量操作:Pipeline
  • 需要条件判断:Lua脚本
  • 需要原子性:Lua脚本

六、最佳实践

6.1 控制批量大小

// ❌ 不推荐:一次发送10万条命令
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 100000; i++) {
    pipeline.set("key" + i, "value" + i);
}
pipeline.sync();  // 可能超时、内存溢出

// ✅ 推荐:分批次发送
int batchSize = 1000;
for (int i = 0; i < 100000; i += batchSize) {
    Pipeline pipeline = jedis.pipelined();

    int end = Math.min(i + batchSize, 100000);
    for (int j = i; j < end; j++) {
        pipeline.set("key" + j, "value" + j);
    }

    pipeline.sync();
    pipeline.close();
}

建议

  • 单次Pipeline命令数:100-1000
  • 避免超时
  • 避免内存溢出

6.2 异常处理

public void batchSetWithErrorHandling(Map<String, String> data) {
    try {
        redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) {
                for (Map.Entry<String, String> entry : data.entrySet()) {
                    try {
                        connection.set(
                            entry.getKey().getBytes(),
                            entry.getValue().getBytes()
                        );
                    } catch (Exception e) {
                        log.error("Pipeline命令失败: key={}", entry.getKey(), e);
                        // Pipeline中的异常不会中断其他命令
                    }
                }
                return null;
            }
        });
    } catch (Exception e) {
        log.error("Pipeline执行失败", e);
        // 处理整体失败
    }
}

6.3 性能监控

@Component
public class PipelineMonitor {

    public <T> T executePipelineWithMetrics(String operation,
                                             Supplier<T> pipelineOperation) {
        long start = System.currentTimeMillis();

        try {
            T result = pipelineOperation.get();

            long duration = System.currentTimeMillis() - start;
            log.info("Pipeline操作[{}]完成, 耗时{}ms", operation, duration);

            // 记录指标
            metricsService.record("pipeline." + operation, duration);

            return result;

        } catch (Exception e) {
            log.error("Pipeline操作[{}]失败", operation, e);
            throw e;
        }
    }
}

七、常见问题

Q1: Pipeline保证原子性吗?

不保证

Pipeline pipeline = jedis.pipelined();
pipeline.set("key1", "value1");  // 可能成功
pipeline.set("key2", "value2");  // 可能失败
pipeline.sync();

// 部分命令可能成功,部分失败
// 不是原子操作!

Q2: Pipeline中的命令会立即执行吗?

不会,会缓存到本地:

Pipeline pipeline = jedis.pipelined();
pipeline.set("key", "value");
// 此时还没发送到Redis

pipeline.sync();  // 这里才真正发送

Q3: Pipeline有大小限制吗?

有隐式限制

- 客户端内存限制:缓存所有命令
- 服务器输出缓冲区:默认512MB
- 网络包大小:通常无限制

建议:单次Pipeline不超过1000条命令

Q4: Pipeline vs 连接池?

两者互补

// Pipeline减少网络往返
// 连接池减少连接建立开销

// 正确用法:从连接池获取连接,再用Pipeline
try (Jedis jedis = jedisPool.getResource()) {
    Pipeline pipeline = jedis.pipelined();
    // ... 批量操作
    pipeline.sync();
}

八、总结

核心要点

  1. RTT是瓶颈:批量操作的性能瓶颈是网络往返,不是Redis
  2. Pipeline原理:批量发送命令,减少网络往返
  3. 性能提升:10-100倍,取决于RTT和批量大小
  4. 无原子性:Pipeline不保证原子性,需要用Lua脚本
  5. 分批发送:单次100-1000条,避免超时和内存溢出
  6. 适用场景:批量读写、混合操作、不需要原子性

性能对比总结

方式1000次操作耗时性能提升
循环单条1000ms
原生批量命令(MGET)2ms500×
Pipeline11ms90×
Lua脚本2-5ms200-500×

批量操作选择指南

同类型、同命令?
└─ Yes → 原生批量命令(MSET/MGET/HMSET)

需要原子性或条件判断?
└─ Yes → Lua脚本

混合操作、不需要原子性?
└─ Yes → Pipeline

🎉 第一阶段完成!

恭喜你完成了Redis从入门到精通的第一阶段!

你现在已经掌握:

  • ✅ Redis的5大数据类型及应用
  • ✅ 键空间管理(过期、淘汰)
  • ✅ 持久化机制(RDB、AOF)
  • ✅ 事务与Pipeline

下一阶段预告

  • 架构原理篇:底层数据结构、单线程模型、主从复制、集群架构

最后的思考题

  1. 为什么Pipeline比循环单条命令快90倍?
  2. Pipeline、事务、Lua脚本,应该如何选择?
  3. 在什么场景下,Pipeline反而会降低性能?

感谢你的学习,祝你在Redis的道路上越走越远! 🚀