RocketMQ生产07:安全机制配置 - ACL 权限控制实战
引言:安全的重要性 某天,开发环境的测试账号误连到生产环境,执行了 deleteTopic 命令,核心业务 Topic 被删除,服务全面瘫痪… 安全风险: ❌ 任何人都能创建/删除 Topic ❌ 未授权用户读取敏感消息 ❌ 恶意攻击导致集群瘫痪 ❌ 内部人员误操作 本文目标: ✅ 理解 RocketMQ ACL 机制 ✅ 配置细粒度权限控制 ✅ 实现用户/应用隔离 ✅ 建立安全最佳实践 一、ACL 基础概念 1.1 什么是 ACL? ACL(Access Control List):访问控制列表,用于定义"谁可以对哪些资源执行什么操作"。 ACL 三要素: 1. Subject(主体):谁在访问?(用户、应用) 2. Resource(资源):访问什么?(Topic、Group) 3. Permission(权限):做什么操作?(PUB、SUB、DENY) 1.2 RocketMQ ACL 权限类型 权限 含义 适用对象 PUB 发布权限(Producer) Topic SUB 订阅权限(Consumer) Topic + Group DENY 禁止访问 Topic + Group ANY 所有权限 Topic + Group 1.3 ACL 工作流程 ┌─────────┐ ┌─────────────────┐ ┌──────────┐ │ Client │ │ ACL Validator │ │ Broker │ └────┬────┘ └────────┬────────┘ └────┬─────┘ │ │ │ │ 1. 发送请求 │ │ │ (AccessKey) │ │ ├───────────────────>│ │ │ │ │ │ │ 2. 校验签名 │ │ │ 校验权限 │ │ │ │ │ │ 3. 权限通过 │ │ ├───────────────────>│ │ │ │ │ 4. 返回结果 │ │ │<──────────────────────────────────────────┤ │ │ │ 二、启用 ACL 2.1 Broker 配置 # broker.conf # 启用 ACL aclEnable=true # ACL 配置文件路径(相对于 ROCKETMQ_HOME) accessKey=conf/plain_acl.yml 2.2 创建 ACL 配置文件 # 创建配置文件 vim /opt/rocketmq/conf/plain_acl.yml # plain_acl.yml # 全局白名单(IP) globalWhiteRemoteAddresses: - 10.10.103.* # 内网IP段 - 192.168.0.* # 用户配置 accounts: # 管理员账号 - accessKey: RocketMQ_Admin secretKey: 12345678 whiteRemoteAddress: # 白名单IP(可选) admin: true # 管理员权限 defaultTopicPerm: DENY # 默认禁止所有Topic defaultGroupPerm: DENY # 默认禁止所有Group topicPerms: - "*=PUB|SUB" # 所有Topic的发布和订阅权限 groupPerms: - "*=PUB|SUB" # 订单服务账号 - accessKey: order_service secretKey: order_pwd_2024 whiteRemoteAddress: 192.168.1.* # 只允许订单服务器IP admin: false defaultTopicPerm: DENY defaultGroupPerm: DENY topicPerms: - "order_topic=PUB" # 只能发送订单消息 - "order_result_topic=SUB" # 只能消费结果消息 groupPerms: - "order_consumer_group=SUB" # 库存服务账号 - accessKey: inventory_service secretKey: inventory_pwd_2024 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: DENY topicPerms: - "order_topic=SUB" # 只能消费订单消息 - "inventory_topic=PUB" # 只能发送库存消息 groupPerms: - "inventory_consumer_group=SUB" # 只读账号(监控用) - accessKey: monitor_user secretKey: monitor_pwd_2024 admin: false defaultTopicPerm: SUB # 默认只读 defaultGroupPerm: SUB topicPerms: - "*=SUB" 2.3 重启 Broker # 停止 Broker sh /opt/rocketmq/bin/mqshutdown broker # 启动 Broker(应用 ACL 配置) sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf & # 验证 ACL 是否生效 tail -f /opt/rocketmq/logs/rocketmqlogs/broker.log | grep ACL # 输出:ACL is enabled 三、客户端配置 3.1 Producer 配置 import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; public class AclProducerExample { public static void main(String[] args) throws Exception { // 1. 创建 ACL 凭证 String accessKey = "order_service"; String secretKey = "order_pwd_2024"; SessionCredentials credentials = new SessionCredentials(accessKey, secretKey); AclClientRPCHook aclHook = new AclClientRPCHook(credentials); // 2. 创建 Producer(传入 ACL Hook) DefaultMQProducer producer = new DefaultMQProducer( "order_producer_group", aclHook, // ACL Hook true, // 启用消息轨迹 null // 自定义轨迹Topic(可选) ); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); // 3. 发送消息 Message msg = new Message("order_topic", "Hello RocketMQ with ACL".getBytes()); SendResult result = producer.send(msg); System.out.println("发送结果:" + result.getSendStatus()); producer.shutdown(); } } 3.2 Consumer 配置 import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class AclConsumerExample { public static void main(String[] args) throws Exception { // 1. 创建 ACL 凭证 String accessKey = "inventory_service"; String secretKey = "inventory_pwd_2024"; SessionCredentials credentials = new SessionCredentials(accessKey, secretKey); AclClientRPCHook aclHook = new AclClientRPCHook(credentials); // 2. 创建 Consumer(传入 ACL Hook) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "inventory_consumer_group", aclHook, // ACL Hook null // 消息轨迹相关配置 ); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("order_topic", "*"); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("收到消息:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer 启动成功"); } } 3.3 Spring Boot 配置 # application.yml rocketmq: name-server: 127.0.0.1:9876 producer: group: order_producer_group access-key: order_service # ACL AccessKey secret-key: order_pwd_2024 # ACL SecretKey send-message-timeout: 3000 retry-times-when-send-failed: 2 consumer: group: inventory_consumer_group access-key: inventory_service secret-key: inventory_pwd_2024 consume-thread-min: 5 consume-thread-max: 10 Producer 代码: ...