引言
恭喜你!这是第一阶段的最后一篇文章。
前面我们学习了Redis的所有基础知识:
- ✅ 5大数据类型
- ✅ 过期和淘汰
- ✅ 持久化
- ✅ 事务
今天我们学习一个性能优化神器:Pipeline(管道)。
一个真实的案例:
需求:批量获取10000个用户信息
方式1:循环GET,耗时10秒
方式2:MGET,耗时0.5秒
方式3:Pipeline,耗时0.3秒
性能提升:30倍!
一、RTT延迟问题
1.1 什么是RTT?
RTT(Round-Trip Time):往返时间,从客户端发送请求到收到响应的时间。
客户端 Redis服务器
| |
| ----发送命令----> | 1ms
| |
| <----返回结果---- | 1ms
| |
RTT = 2ms
实际测量:
# 本地Redis
$ redis-cli --latency
min: 0.05, max: 2, avg: 0.12 (ms)
# 同机房Redis
RTT ≈ 0.5-1ms
# 跨机房Redis
RTT ≈ 5-10ms
# 跨地域Redis
RTT ≈ 50-100ms
1.2 RTT的影响
单条命令的性能:
// 1次GET操作
redis.opsForValue().get("key");
// 耗时:RTT + Redis执行时间
// = 1ms + 0.01ms ≈ 1ms
批量操作的性能问题:
// 1000次GET操作
for (int i = 0; i < 1000; i++) {
redis.opsForValue().get("key" + i);
}
// 耗时:1000 × (1ms + 0.01ms) ≈ 1000ms = 1秒
// 瓶颈不是Redis慢,而是网络RTT!
性能对比:
| 操作 | RTT | Redis执行 | 总耗时 |
|---|---|---|---|
| 1次GET | 1ms | 0.01ms | 1.01ms |
| 1000次GET | 1000ms | 10ms | 1010ms |
| MGET 1000个 | 1ms | 1ms | 2ms |
结论:批量操作的瓶颈是网络往返,而不是Redis本身!
二、Pipeline的原理
2.1 什么是Pipeline?
Pipeline是批量发送命令的机制:
普通方式(3次网络往返):
客户端 → SET key1 → Redis
客户端 ← OK ← Redis
客户端 → SET key2 → Redis
客户端 ← OK ← Redis
客户端 → SET key3 → Redis
客户端 ← OK ← Redis
Pipeline方式(1次网络往返):
客户端 → SET key1 + SET key2 + SET key3 → Redis
客户端 ← OK + OK + OK ← Redis
工作流程:
1. 客户端:将多条命令打包
2. 一次性发送到Redis服务器
3. Redis:依次执行所有命令
4. 一次性返回所有结果
2.2 Pipeline vs 普通方式
普通方式:
for (int i = 0; i < 1000; i++) {
redis.opsForValue().set("key" + i, "value" + i);
}
// 1000次网络往返
// 耗时:1000 × 1ms = 1000ms
Pipeline方式:
Pipeline pipeline = redis.pipelined();
for (int i = 0; i < 1000; i++) {
pipeline.set("key" + i, "value" + i);
}
pipeline.sync(); // 一次性发送
// 1次网络往返
// 耗时:1ms + 10ms ≈ 11ms
// 性能提升:1000ms / 11ms ≈ 90倍!
三、Pipeline使用方法
3.1 Jedis示例
// 创建Pipeline
Jedis jedis = new Jedis("localhost", 6379);
Pipeline pipeline = jedis.pipelined();
// 添加命令
for (int i = 0; i < 1000; i++) {
pipeline.set("key" + i, "value" + i);
pipeline.incr("counter");
}
// 执行并获取结果
List<Object> results = pipeline.syncAndReturnAll();
// 关闭
jedis.close();
3.2 Lettuce示例
// Lettuce自动使用Pipeline(异步模式)
RedisAsyncCommands<String, String> async = connection.async();
// 批量操作
List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
futures.add(async.set("key" + i, "value" + i));
}
// 等待所有操作完成
async.flushCommands(); // 立即发送
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
3.3 Spring RedisTemplate示例
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void batchSet(Map<String, String> data) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (Map.Entry<String, String> entry : data.entrySet()) {
connection.set(
entry.getKey().getBytes(),
entry.getValue().getBytes()
);
}
return null; // Pipeline不需要返回值
}
});
}
四、实战场景
场景1:批量查询用户信息
@Service
public class UserBatchService {
// ❌ 慢方式:循环查询
public List<User> getUsersSlow(List<Long> userIds) {
List<User> users = new ArrayList<>();
for (Long userId : userIds) {
String key = "user:" + userId;
User user = (User) redis.opsForValue().get(key);
if (user != null) {
users.add(user);
}
}
return users;
}
// ✅ 快方式:Pipeline批量查询
public List<User> getUsersFast(List<Long> userIds) {
List<Object> results = redisTemplate.executePipelined(
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (Long userId : userIds) {
String key = "user:" + userId;
connection.get(key.getBytes());
}
return null;
}
}
);
List<User> users = new ArrayList<>();
for (Object result : results) {
if (result != null) {
User user = JSON.parseObject((String) result, User.class);
users.add(user);
}
}
return users;
}
}
性能对比:
查询1000个用户:
慢方式:1000次网络往返 = 1000ms
快方式:1次网络往返 = 2ms
提升:500倍
场景2:批量统计数据
@Service
public class StatisticsService {
// 批量获取多个文章的统计数据
public Map<Long, ArticleStats> batchGetArticleStats(List<Long> articleIds) {
List<Object> results = redisTemplate.executePipelined(
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (Long articleId : articleIds) {
String pvKey = "article:pv:" + articleId;
String likeKey = "article:like:" + articleId;
String commentKey = "article:comment:" + articleId;
connection.get(pvKey.getBytes());
connection.get(likeKey.getBytes());
connection.get(commentKey.getBytes());
}
return null;
}
}
);
Map<Long, ArticleStats> statsMap = new HashMap<>();
int idx = 0;
for (Long articleId : articleIds) {
ArticleStats stats = new ArticleStats();
stats.setPv(parseInt(results.get(idx++)));
stats.setLikes(parseInt(results.get(idx++)));
stats.setComments(parseInt(results.get(idx++)));
statsMap.put(articleId, stats);
}
return statsMap;
}
private int parseInt(Object value) {
return value != null ? Integer.parseInt((String) value) : 0;
}
}
场景3:批量删除缓存
@Service
public class CacheInvalidationService {
// 批量删除用户相关的所有缓存
public void invalidateUserCache(List<Long> userIds) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (Long userId : userIds) {
// 删除用户信息缓存
connection.del(("user:" + userId).getBytes());
// 删除用户权限缓存
connection.del(("user:permissions:" + userId).getBytes());
// 删除用户会话
connection.del(("session:user:" + userId).getBytes());
}
return null;
}
});
}
}
场景4:批量设置过期时间
// 批量刷新Session过期时间
public void refreshSessions(List<String> sessionIds) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (String sessionId : sessionIds) {
String key = "session:" + sessionId;
connection.expire(key.getBytes(), 1800); // 30分钟
}
return null;
}
});
}
五、Pipeline vs 其他批量操作
5.1 Pipeline vs 事务
| 维度 | Pipeline | 事务(MULTI/EXEC) |
|---|---|---|
| 原子性 | ❌ 无 | ⚠️ 有限 |
| 顺序保证 | ✅ 有 | ✅ 有 |
| 失败回滚 | ❌ 无 | ❌ 无 |
| 性能 | ✅ 高 | ✅ 高 |
| 使用场景 | 批量操作 | 需要原子性 |
可以结合使用:
Pipeline pipeline = jedis.pipelined();
// 在Pipeline中使用事务
pipeline.multi();
pipeline.set("key1", "value1");
pipeline.set("key2", "value2");
pipeline.exec();
pipeline.sync();
5.2 Pipeline vs 原生批量命令
| 场景 | Pipeline | 原生命令 |
|---|---|---|
| 批量SET | Pipeline | MSET(更快) |
| 批量GET | Pipeline | MGET(更快) |
| 混合操作 | Pipeline(唯一选择) | - |
| 不同数据类型 | Pipeline(唯一选择) | - |
推荐:
- 同类型操作:优先用原生批量命令(MSET/MGET/HMSET/ZADD…)
- 混合操作:用Pipeline
5.3 Pipeline vs Lua脚本
| 维度 | Pipeline | Lua脚本 |
|---|---|---|
| 原子性 | ❌ 无 | ✅ 有 |
| 条件判断 | ❌ 无 | ✅ 有 |
| 网络往返 | 1次 | 1次 |
| 灵活性 | 低 | 高 |
| 复杂度 | 低 | 高 |
选择指南:
- 简单批量操作:Pipeline
- 需要条件判断:Lua脚本
- 需要原子性:Lua脚本
六、最佳实践
6.1 控制批量大小
// ❌ 不推荐:一次发送10万条命令
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 100000; i++) {
pipeline.set("key" + i, "value" + i);
}
pipeline.sync(); // 可能超时、内存溢出
// ✅ 推荐:分批次发送
int batchSize = 1000;
for (int i = 0; i < 100000; i += batchSize) {
Pipeline pipeline = jedis.pipelined();
int end = Math.min(i + batchSize, 100000);
for (int j = i; j < end; j++) {
pipeline.set("key" + j, "value" + j);
}
pipeline.sync();
pipeline.close();
}
建议:
- 单次Pipeline命令数:100-1000
- 避免超时
- 避免内存溢出
6.2 异常处理
public void batchSetWithErrorHandling(Map<String, String> data) {
try {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
for (Map.Entry<String, String> entry : data.entrySet()) {
try {
connection.set(
entry.getKey().getBytes(),
entry.getValue().getBytes()
);
} catch (Exception e) {
log.error("Pipeline命令失败: key={}", entry.getKey(), e);
// Pipeline中的异常不会中断其他命令
}
}
return null;
}
});
} catch (Exception e) {
log.error("Pipeline执行失败", e);
// 处理整体失败
}
}
6.3 性能监控
@Component
public class PipelineMonitor {
public <T> T executePipelineWithMetrics(String operation,
Supplier<T> pipelineOperation) {
long start = System.currentTimeMillis();
try {
T result = pipelineOperation.get();
long duration = System.currentTimeMillis() - start;
log.info("Pipeline操作[{}]完成, 耗时{}ms", operation, duration);
// 记录指标
metricsService.record("pipeline." + operation, duration);
return result;
} catch (Exception e) {
log.error("Pipeline操作[{}]失败", operation, e);
throw e;
}
}
}
七、常见问题
Q1: Pipeline保证原子性吗?
不保证:
Pipeline pipeline = jedis.pipelined();
pipeline.set("key1", "value1"); // 可能成功
pipeline.set("key2", "value2"); // 可能失败
pipeline.sync();
// 部分命令可能成功,部分失败
// 不是原子操作!
Q2: Pipeline中的命令会立即执行吗?
不会,会缓存到本地:
Pipeline pipeline = jedis.pipelined();
pipeline.set("key", "value");
// 此时还没发送到Redis
pipeline.sync(); // 这里才真正发送
Q3: Pipeline有大小限制吗?
有隐式限制:
- 客户端内存限制:缓存所有命令
- 服务器输出缓冲区:默认512MB
- 网络包大小:通常无限制
建议:单次Pipeline不超过1000条命令
Q4: Pipeline vs 连接池?
两者互补:
// Pipeline减少网络往返
// 连接池减少连接建立开销
// 正确用法:从连接池获取连接,再用Pipeline
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();
// ... 批量操作
pipeline.sync();
}
八、总结
核心要点
- RTT是瓶颈:批量操作的性能瓶颈是网络往返,不是Redis
- Pipeline原理:批量发送命令,减少网络往返
- 性能提升:10-100倍,取决于RTT和批量大小
- 无原子性:Pipeline不保证原子性,需要用Lua脚本
- 分批发送:单次100-1000条,避免超时和内存溢出
- 适用场景:批量读写、混合操作、不需要原子性
性能对比总结
| 方式 | 1000次操作耗时 | 性能提升 |
|---|---|---|
| 循环单条 | 1000ms | 1× |
| 原生批量命令(MGET) | 2ms | 500× |
| Pipeline | 11ms | 90× |
| Lua脚本 | 2-5ms | 200-500× |
批量操作选择指南
同类型、同命令?
└─ Yes → 原生批量命令(MSET/MGET/HMSET)
需要原子性或条件判断?
└─ Yes → Lua脚本
混合操作、不需要原子性?
└─ Yes → Pipeline
🎉 第一阶段完成!
恭喜你完成了Redis从入门到精通的第一阶段!
你现在已经掌握:
- ✅ Redis的5大数据类型及应用
- ✅ 键空间管理(过期、淘汰)
- ✅ 持久化机制(RDB、AOF)
- ✅ 事务与Pipeline
下一阶段预告:
- 架构原理篇:底层数据结构、单线程模型、主从复制、集群架构
最后的思考题:
- 为什么Pipeline比循环单条命令快90倍?
- Pipeline、事务、Lua脚本,应该如何选择?
- 在什么场景下,Pipeline反而会降低性能?
感谢你的学习,祝你在Redis的道路上越走越远! 🚀