引言:安全的重要性

某天,开发环境的测试账号误连到生产环境,执行了 deleteTopic 命令,核心业务 Topic 被删除,服务全面瘫痪…

安全风险

  • ❌ 任何人都能创建/删除 Topic
  • ❌ 未授权用户读取敏感消息
  • ❌ 恶意攻击导致集群瘫痪
  • ❌ 内部人员误操作

本文目标

  • ✅ 理解 RocketMQ ACL 机制
  • ✅ 配置细粒度权限控制
  • ✅ 实现用户/应用隔离
  • ✅ 建立安全最佳实践

一、ACL 基础概念

1.1 什么是 ACL?

ACL(Access Control List):访问控制列表,用于定义"谁可以对哪些资源执行什么操作"。

ACL 三要素:
1. Subject(主体):谁在访问?(用户、应用)
2. Resource(资源):访问什么?(Topic、Group)
3. Permission(权限):做什么操作?(PUB、SUB、DENY)

1.2 RocketMQ ACL 权限类型

权限含义适用对象
PUB发布权限(Producer)Topic
SUB订阅权限(Consumer)Topic + Group
DENY禁止访问Topic + Group
ANY所有权限Topic + Group

1.3 ACL 工作流程

┌─────────┐      ┌─────────────────┐      ┌──────────┐
│ Client  │      │   ACL Validator │      │  Broker  │
└────┬────┘      └────────┬────────┘      └────┬─────┘
     │                    │                    │
     │ 1. 发送请求         │                    │
     │  (AccessKey)       │                    │
     ├───────────────────>│                    │
     │                    │                    │
     │                    │ 2. 校验签名         │
     │                    │    校验权限         │
     │                    │                    │
     │                    │ 3. 权限通过         │
     │                    ├───────────────────>│
     │                    │                    │
     │ 4. 返回结果         │                    │
     │<──────────────────────────────────────────┤
     │                    │                    │

二、启用 ACL

2.1 Broker 配置

# broker.conf

# 启用 ACL
aclEnable=true

# ACL 配置文件路径(相对于 ROCKETMQ_HOME)
accessKey=conf/plain_acl.yml

2.2 创建 ACL 配置文件

# 创建配置文件
vim /opt/rocketmq/conf/plain_acl.yml
# plain_acl.yml

# 全局白名单(IP)
globalWhiteRemoteAddresses:
  - 10.10.103.*      # 内网IP段
  - 192.168.0.*

# 用户配置
accounts:
  # 管理员账号
  - accessKey: RocketMQ_Admin
    secretKey: 12345678
    whiteRemoteAddress:      # 白名单IP(可选)
    admin: true              # 管理员权限
    defaultTopicPerm: DENY   # 默认禁止所有Topic
    defaultGroupPerm: DENY   # 默认禁止所有Group
    topicPerms:
      - "*=PUB|SUB"          # 所有Topic的发布和订阅权限
    groupPerms:
      - "*=PUB|SUB"

  # 订单服务账号
  - accessKey: order_service
    secretKey: order_pwd_2024
    whiteRemoteAddress: 192.168.1.*  # 只允许订单服务器IP
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - "order_topic=PUB"    # 只能发送订单消息
      - "order_result_topic=SUB"  # 只能消费结果消息
    groupPerms:
      - "order_consumer_group=SUB"

  # 库存服务账号
  - accessKey: inventory_service
    secretKey: inventory_pwd_2024
    whiteRemoteAddress:
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - "order_topic=SUB"    # 只能消费订单消息
      - "inventory_topic=PUB"  # 只能发送库存消息
    groupPerms:
      - "inventory_consumer_group=SUB"

  # 只读账号(监控用)
  - accessKey: monitor_user
    secretKey: monitor_pwd_2024
    admin: false
    defaultTopicPerm: SUB    # 默认只读
    defaultGroupPerm: SUB
    topicPerms:
      - "*=SUB"

2.3 重启 Broker

# 停止 Broker
sh /opt/rocketmq/bin/mqshutdown broker

# 启动 Broker(应用 ACL 配置)
sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf &

# 验证 ACL 是否生效
tail -f /opt/rocketmq/logs/rocketmqlogs/broker.log | grep ACL
# 输出:ACL is enabled

三、客户端配置

3.1 Producer 配置

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class AclProducerExample {

    public static void main(String[] args) throws Exception {
        // 1. 创建 ACL 凭证
        String accessKey = "order_service";
        String secretKey = "order_pwd_2024";
        SessionCredentials credentials = new SessionCredentials(accessKey, secretKey);
        AclClientRPCHook aclHook = new AclClientRPCHook(credentials);

        // 2. 创建 Producer(传入 ACL Hook)
        DefaultMQProducer producer = new DefaultMQProducer(
            "order_producer_group",
            aclHook,  // ACL Hook
            true,     // 启用消息轨迹
            null      // 自定义轨迹Topic(可选)
        );

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        // 3. 发送消息
        Message msg = new Message("order_topic", "Hello RocketMQ with ACL".getBytes());
        SendResult result = producer.send(msg);
        System.out.println("发送结果:" + result.getSendStatus());

        producer.shutdown();
    }
}

3.2 Consumer 配置

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class AclConsumerExample {

    public static void main(String[] args) throws Exception {
        // 1. 创建 ACL 凭证
        String accessKey = "inventory_service";
        String secretKey = "inventory_pwd_2024";
        SessionCredentials credentials = new SessionCredentials(accessKey, secretKey);
        AclClientRPCHook aclHook = new AclClientRPCHook(credentials);

        // 2. 创建 Consumer(传入 ACL Hook)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "inventory_consumer_group",
            aclHook,  // ACL Hook
            null      // 消息轨迹相关配置
        );

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order_topic", "*");

        // 3. 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {

                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer 启动成功");
    }
}

3.3 Spring Boot 配置

# application.yml
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: order_producer_group
    access-key: order_service        # ACL AccessKey
    secret-key: order_pwd_2024       # ACL SecretKey
    send-message-timeout: 3000
    retry-times-when-send-failed: 2

  consumer:
    group: inventory_consumer_group
    access-key: inventory_service
    secret-key: inventory_pwd_2024
    consume-thread-min: 5
    consume-thread-max: 10

Producer 代码

@Service
public class OrderProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrder(OrderDTO order) {
        // Spring Boot 自动注入 ACL 凭证
        rocketMQTemplate.convertAndSend("order_topic", order);
    }
}

四、权限测试

4.1 测试场景 1:正常访问

// 使用 order_service 账号发送消息到 order_topic(有权限)
String accessKey = "order_service";
String secretKey = "order_pwd_2024";
SessionCredentials credentials = new SessionCredentials(accessKey, secretKey);
AclClientRPCHook aclHook = new AclClientRPCHook(credentials);

DefaultMQProducer producer = new DefaultMQProducer("test_group", aclHook, false, null);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

Message msg = new Message("order_topic", "test".getBytes());
SendResult result = producer.send(msg);

// 输出:SEND_OK
System.out.println(result.getSendStatus());

4.2 测试场景 2:权限拒绝

// 使用 order_service 账号发送消息到 inventory_topic(无权限)
Message msg = new Message("inventory_topic", "test".getBytes());

try {
    SendResult result = producer.send(msg);
} catch (Exception e) {
    // 输出:CODE: 207 DESC: No permission to send message to topic
    e.printStackTrace();
}

4.3 测试场景 3:错误凭证

// 使用错误的 SecretKey
String accessKey = "order_service";
String secretKey = "wrong_password";
SessionCredentials credentials = new SessionCredentials(accessKey, secretKey);
AclClientRPCHook aclHook = new AclClientRPCHook(credentials);

DefaultMQProducer producer = new DefaultMQProducer("test_group", aclHook, false, null);
producer.setNamesrvAddr("127.0.0.1:9876");

try {
    producer.start();
} catch (Exception e) {
    // 输出:CODE: 204 DESC: signature doesn't match, maybe accessKey or secretKey is wrong
    e.printStackTrace();
}

五、高级配置

5.1 IP 白名单

# plain_acl.yml

accounts:
  - accessKey: web_service
    secretKey: web_pwd_2024
    whiteRemoteAddress: 192.168.1.100,192.168.1.101  # 逗号分隔多个IP
    admin: false
    topicPerms:
      - "web_topic=PUB"

IP 格式支持

  • 单个 IP:192.168.1.100
  • IP 段:192.168.1.*
  • CIDR:192.168.1.0/24
  • 多个 IP:192.168.1.100,192.168.1.101

5.2 通配符权限

accounts:
  - accessKey: log_service
    secretKey: log_pwd_2024
    admin: false
    topicPerms:
      - "log_*=PUB"       # 所有以 log_ 开头的 Topic 有发送权限
      - "*_result=SUB"    # 所有以 _result 结尾的 Topic 有订阅权限

5.3 动态更新 ACL

# 修改 plain_acl.yml 后,无需重启 Broker

# 1. 修改配置文件
vim /opt/rocketmq/conf/plain_acl.yml

# 2. 通知 Broker 重新加载 ACL
sh mqadmin updateAcl -n 127.0.0.1:9876

# 3. 验证新配置
# 客户端重新连接即可使用新权限

六、安全最佳实践

6.1 权限设计原则

1. 最小权限原则:只授予必要的权限
2. 职责分离:不同服务使用不同账号
3. 定期审计:定期检查权限配置
4. 密钥轮换:定期更换 SecretKey
5. 白名单限制:限制访问IP范围

6.2 账号命名规范

{服务名}_{环境}_{角色}

示例:
- order_prod_producer     # 订单服务生产环境Producer
- order_prod_consumer     # 订单服务生产环境Consumer
- inventory_test_producer # 库存服务测试环境Producer
- monitor_prod_readonly   # 监控服务生产环境只读

6.3 密钥管理

不要硬编码

// ❌ 错误:硬编码密钥
String secretKey = "order_pwd_2024";

// ✅ 正确:从配置中心读取
String secretKey = ConfigClient.get("rocketmq.order.secretKey");

// ✅ 正确:从环境变量读取
String secretKey = System.getenv("ROCKETMQ_SECRET_KEY");

定期轮换密钥

# 1. 生成新密钥
NEW_SECRET_KEY=$(openssl rand -base64 32)

# 2. 更新配置文件
vim /opt/rocketmq/conf/plain_acl.yml
# 修改 secretKey

# 3. 通知 Broker 重新加载
sh mqadmin updateAcl -n 127.0.0.1:9876

# 4. 灰度更新客户端(新旧密钥并存)

# 5. 验证新密钥生效后,删除旧密钥

6.4 监控与审计

# prometheus/alert_rules.yml

groups:
  - name: rocketmq_security
    rules:
      # 1. ACL 认证失败告警
      - alert: AclAuthenticationFailed
        expr: rate(rocketmq_acl_auth_failed_total[5m]) > 10
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "ACL 认证失败频繁"
          description: "过去 5 分钟认证失败 {{ $value }} 次"

      # 2. 未授权访问告警
      - alert: UnauthorizedAccess
        expr: rate(rocketmq_acl_permission_denied_total[5m]) > 5
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "检测到未授权访问"
          description: "过去 5 分钟未授权访问 {{ $value }} 次"

七、故障排查

7.1 常见错误

错误 1:CODE: 204(签名不匹配)

org.apache.rocketmq.remoting.exception.RemotingCommandException:
CODE: 204 DESC: signature doesn't match

原因

  • AccessKey 或 SecretKey 错误
  • 时间不同步(时间差 > 5 分钟)

解决方法

# 1. 检查凭证是否正确
cat /opt/rocketmq/conf/plain_acl.yml | grep -A 5 "order_service"

# 2. 检查服务器时间同步
ntpdate -u pool.ntp.org
date

错误 2:CODE: 207(无权限)

org.apache.rocketmq.remoting.exception.RemotingCommandException:
CODE: 207 DESC: No permission to send message to topic

原因

  • 账号没有对应 Topic 的发送权限

解决方法

# 修改 plain_acl.yml,添加权限
accounts:
  - accessKey: order_service
    secretKey: order_pwd_2024
    topicPerms:
      - "order_topic=PUB"  # 添加发送权限

错误 3:IP 白名单拒绝

CODE: 205 DESC: The remoteAddress is not in whitelist

原因

  • 客户端 IP 不在白名单中

解决方法

# 添加 IP 到白名单
accounts:
  - accessKey: order_service
    whiteRemoteAddress: 192.168.1.100,192.168.1.101

7.2 调试技巧

# 1. 查看 Broker ACL 日志
tail -f /opt/rocketmq/logs/rocketmqlogs/broker.log | grep ACL

# 2. 查看认证失败详情
grep "CODE: 204\|CODE: 207" /opt/rocketmq/logs/rocketmqlogs/broker.log

# 3. 验证配置文件语法
cat /opt/rocketmq/conf/plain_acl.yml | python -m yaml

# 4. 测试连接(使用 mqadmin)
sh mqadmin sendMessage \
  -n 127.0.0.1:9876 \
  -t test_topic \
  -p "test message" \
  -ak order_service \
  -sk order_pwd_2024

八、总结

ACL 配置检查清单

  • 启用 ACL(aclEnable=true)
  • 创建 plain_acl.yml 配置文件
  • 为每个服务创建独立账号
  • 配置最小权限
  • 限制 IP 白名单
  • 密钥不要硬编码
  • 配置监控告警
  • 定期审计权限
  • 定期轮换密钥
  • 编写应急预案

权限设计示例

账号Topic 权限Group 权限适用场景
admin*=PUB|SUB*=PUB|SUB管理员
order_producerorder_topic=PUBorder_group=SUB订单生产者
inventory_consumerorder_topic=SUBinventory_group=SUB库存消费者
monitor_readonly*=SUB*=SUB监控只读

核心要点

1. 生产环境必须启用 ACL
2. 每个服务使用独立账号
3. 遵循最小权限原则
4. 定期审计和轮换密钥
5. 配置监控告警

下一篇预告:《版本升级策略 - 平滑迁移与回滚方案》,我们将讲解如何安全地升级 RocketMQ 版本,避免业务中断。

本文关键词ACL 权限控制 安全配置 访问控制