延迟队列原理

核心思想:ZSet的score存储执行时间戳

ZADD delay_queue <timestamp> <task_id>

轮询:
1. ZRANGEBYSCORE delay_queue 0 <now> LIMIT 0 100
2. 取出到期任务
3. 执行任务
4. ZREM删除已执行任务

基础实现

@Component
public class DelayQueue {
    @Autowired
    private RedisTemplate<String, String> redis;

    private static final String QUEUE_KEY = "delay:queue";

    // 添加延迟任务
    public void addTask(String taskId, long delaySeconds) {
        long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
        redis.opsForZSet().add(QUEUE_KEY, taskId, executeTime);
    }

    // 拉取到期任务
    public Set<String> pollTasks() {
        long now = System.currentTimeMillis();
        Set<String> tasks = redis.opsForZSet()
            .rangeByScore(QUEUE_KEY, 0, now, 0, 100);

        if (tasks != null && !tasks.isEmpty()) {
            // 删除已拉取的任务
            redis.opsForZSet().removeRangeByScore(QUEUE_KEY, 0, now);
        }
        return tasks;
    }

    // 定时拉取
    @Scheduled(fixedRate = 1000)  // 每秒执行
    public void consumeTasks() {
        Set<String> tasks = pollTasks();
        if (tasks != null) {
            tasks.forEach(this::executeTask);
        }
    }

    private void executeTask(String taskId) {
        log.info("执行延迟任务: {}", taskId);
        // 任务执行逻辑
    }
}

实战案例

案例1:订单超时自动取消

@Service
public class OrderTimeoutService {
    @Autowired
    private RedisTemplate<String, String> redis;
    @Autowired
    private OrderService orderService;

    private static final String QUEUE_KEY = "delay:order:timeout";
    private static final int TIMEOUT_MINUTES = 30;  // 30分钟超时

    // 创建订单时,添加超时任务
    public void createOrder(String orderNo) {
        orderService.create(orderNo);

        // 添加到延迟队列
        long executeTime = System.currentTimeMillis() + TIMEOUT_MINUTES * 60 * 1000;
        redis.opsForZSet().add(QUEUE_KEY, orderNo, executeTime);
    }

    // 支付成功时,取消超时任务
    public void payOrder(String orderNo) {
        orderService.pay(orderNo);

        // 从延迟队列移除
        redis.opsForZSet().remove(QUEUE_KEY, orderNo);
    }

    // 定时检查超时订单
    @Scheduled(fixedRate = 10000)  // 每10秒
    public void checkTimeout() {
        long now = System.currentTimeMillis();
        Set<String> timeoutOrders = redis.opsForZSet()
            .rangeByScore(QUEUE_KEY, 0, now, 0, 100);

        if (timeoutOrders != null) {
            for (String orderNo : timeoutOrders) {
                try {
                    // 取消订单
                    orderService.cancel(orderNo, "超时未支付");

                    // 从队列移除
                    redis.opsForZSet().remove(QUEUE_KEY, orderNo);

                    log.info("订单超时已取消: {}", orderNo);
                } catch (Exception e) {
                    log.error("取消订单失败: {}", orderNo, e);
                }
            }
        }
    }
}

案例2:消息定时发送

@Service
public class ScheduledMessageService {
    @Autowired
    private RedisTemplate<String, String> redis;

    private static final String QUEUE_KEY = "delay:message";

    // 添加定时消息
    public void scheduleMessage(String messageId, String content, LocalDateTime sendTime) {
        // 存储消息内容
        redis.opsForValue().set("message:" + messageId, content, 7, TimeUnit.DAYS);

        // 添加到延迟队列
        long timestamp = sendTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        redis.opsForZSet().add(QUEUE_KEY, messageId, timestamp);
    }

    @Scheduled(fixedRate = 5000)  // 每5秒
    public void sendScheduledMessages() {
        long now = System.currentTimeMillis();
        Set<String> messageIds = redis.opsForZSet()
            .rangeByScore(QUEUE_KEY, 0, now, 0, 50);

        if (messageIds != null) {
            for (String messageId : messageIds) {
                String content = redis.opsForValue().get("message:" + messageId);
                if (content != null) {
                    sendMessage(content);
                    redis.delete("message:" + messageId);
                }
                redis.opsForZSet().remove(QUEUE_KEY, messageId);
            }
        }
    }

    private void sendMessage(String content) {
        log.info("发送消息: {}", content);
        // 实际发送逻辑
    }
}

案例3:优惠券过期提醒

@Service
public class CouponReminderService {
    @Autowired
    private RedisTemplate<String, String> redis;

    private static final String QUEUE_KEY = "delay:coupon:reminder";

    // 发放优惠券时,设置提醒
    public void issueCoupon(String couponId, LocalDateTime expireTime) {
        // 过期前3天提醒
        long reminderTime = expireTime.minusDays(3)
            .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

        redis.opsForZSet().add(QUEUE_KEY, couponId, reminderTime);
    }

    @Scheduled(fixedRate = 3600000)  // 每小时
    public void sendReminders() {
        long now = System.currentTimeMillis();
        Set<String> couponIds = redis.opsForZSet()
            .rangeByScore(QUEUE_KEY, 0, now);

        if (couponIds != null) {
            couponIds.forEach(couponId -> {
                sendReminder(couponId);
                redis.opsForZSet().remove(QUEUE_KEY, couponId);
            });
        }
    }

    private void sendReminder(String couponId) {
        log.info("发送优惠券过期提醒: {}", couponId);
        // 发送短信/推送通知
    }
}

可靠性保证

1. 防止重复消费(Lua脚本)

private static final String POP_SCRIPT =
    "local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) " +
    "if #tasks > 0 then " +
    "  redis.call('ZREM', KEYS[1], unpack(tasks)) " +
    "  return tasks " +
    "else " +
    "  return {} " +
    "end";

public List<String> popTasks(int limit) {
    long now = System.currentTimeMillis();
    return redis.execute(
        RedisScript.of(POP_SCRIPT, List.class),
        Collections.singletonList(QUEUE_KEY),
        String.valueOf(now),
        String.valueOf(limit)
    );
}

2. 任务执行失败重试

public void consumeTasks() {
    List<String> tasks = popTasks(100);

    for (String taskId : tasks) {
        try {
            executeTask(taskId);
        } catch (Exception e) {
            log.error("任务执行失败: {}", taskId, e);

            // 重新入队,延迟60秒后重试
            long retryTime = System.currentTimeMillis() + 60000;
            redis.opsForZSet().add(QUEUE_KEY, taskId, retryTime);
        }
    }
}

3. 限制重试次数

public void executeWithRetry(String taskId) {
    String retryKey = "retry:" + taskId;
    Integer retryCount = (Integer) redis.opsForValue().get(retryKey);

    if (retryCount == null) {
        retryCount = 0;
    }

    if (retryCount >= 3) {
        log.error("任务重试次数超限: {}", taskId);
        // 转移到死信队列
        redis.opsForList().rightPush("dead_letter_queue", taskId);
        redis.delete(retryKey);
        return;
    }

    try {
        executeTask(taskId);
        redis.delete(retryKey);  // 执行成功,清除重试计数
    } catch (Exception e) {
        // 重试计数+1
        redis.opsForValue().increment(retryKey);
        redis.expire(retryKey, 1, TimeUnit.DAYS);

        // 重新入队
        long retryTime = System.currentTimeMillis() + (retryCount + 1) * 60000;
        redis.opsForZSet().add(QUEUE_KEY, taskId, retryTime);
    }
}

性能优化

1. 批量处理

@Scheduled(fixedRate = 1000)
public void consumeTasks() {
    List<String> tasks = popTasks(100);  // 每次最多100个

    // 使用线程池并行处理
    tasks.forEach(taskId ->
        executor.submit(() -> executeTask(taskId))
    );
}

2. 多实例消费(分片)

// 实例1消费shard0
// 实例2消费shard1
@Value("${app.shard.id}")
private int shardId;

public void addTask(String taskId, long delaySeconds) {
    int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT;
    String queueKey = "delay:queue:shard:" + shard;

    long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
    redis.opsForZSet().add(queueKey, taskId, executeTime);
}

@Scheduled(fixedRate = 1000)
public void consumeTasks() {
    String queueKey = "delay:queue:shard:" + shardId;
    // 消费分片队列
}

与其他方案对比

方案精度可靠性复杂度适用场景
Redis ZSet秒级轻量级延迟任务
RabbitMQ延迟插件毫秒级消息队列场景
时间轮毫秒级高性能定时任务
ScheduledExecutorService毫秒级单机定时任务

总结

核心优势

  • 实现简单(ZSet + 定时拉取)
  • 无需额外依赖
  • 适合中小规模场景

典型场景

  • 订单超时取消
  • 消息定时发送
  • 优惠券过期提醒
  • 定时任务调度

注意事项

  • 精度秒级(不适合毫秒级)
  • 单点故障(Redis宕机)
  • 大规模场景考虑分片
  • 失败重试和死信队列