RocketMQ入门04:快速上手 - Docker一键部署与第一条消息

引言:5分钟上手 RocketMQ 不需要复杂的环境配置,不需要编译源码,通过 Docker,我们可以在 5 分钟内搭建一个完整的 RocketMQ 环境,并成功发送第一条消息。 让我们开始这段激动人心的旅程! 一、环境准备 1.1 前置要求 # 检查 Docker 版本(需要 20.10+) docker --version # Docker version 24.0.5, build ced0996 # 检查 Docker Compose 版本(需要 2.0+) docker-compose --version # Docker Compose version v2.20.2 # 检查系统资源 # 建议:4GB+ 内存,10GB+ 磁盘空间 docker system info | grep -E "Memory|Storage" 1.2 创建项目目录 # 创建项目目录 mkdir -p ~/rocketmq-demo cd ~/rocketmq-demo # 创建必要的子目录 mkdir -p data/namesrv/logs data/broker/logs data/broker/store mkdir -p conf 二、Docker Compose 部署 2.1 编写 docker-compose.yml # docker-compose.yml version: '3.8' services: # NameServer 服务 namesrv: image: apache/rocketmq:5.1.3 container_name: rocketmq-namesrv ports: - "9876:9876" environment: JAVA_OPT: "-Duser.home=/opt" JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m" volumes: - ./data/namesrv/logs:/opt/logs command: sh mqnamesrv networks: - rocketmq-net # Broker 服务 broker: image: apache/rocketmq:5.1.3 container_name: rocketmq-broker ports: - "10909:10909" - "10911:10911" - "10912:10912" environment: JAVA_OPT_EXT: "-Xms1G -Xmx1G -Xmn512m" volumes: - ./data/broker/logs:/opt/logs - ./data/broker/store:/opt/store - ./conf/broker.conf:/opt/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /opt/conf/broker.conf depends_on: - namesrv networks: - rocketmq-net # 控制台服务 dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - "8080:8080" environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - namesrv - broker networks: - rocketmq-net networks: rocketmq-net: driver: bridge 2.2 配置 Broker # conf/broker.conf cat > conf/broker.conf << 'EOF' # 集群名称 brokerClusterName = DefaultCluster # broker 名称 brokerName = broker-a # 0 表示 Master,>0 表示 Slave brokerId = 0 # 删除文件时间点,默认凌晨 4 点 deleteWhen = 04 # 文件保留时间,默认 48 小时 fileReservedTime = 48 # Broker 的角色 brokerRole = ASYNC_MASTER # 刷盘方式 flushDiskType = ASYNC_FLUSH # 存储路径 storePathRootDir = /opt/store # 队列存储路径 storePathCommitLog = /opt/store/commitlog # 自动创建 Topic autoCreateTopicEnable = true # 自动创建订阅组 autoCreateSubscriptionGroup = true # Broker 监听端口 listenPort = 10911 # 是否允许 Broker 自动创建 Topic brokerIP1 = broker EOF 2.3 启动服务 # 启动所有服务 docker-compose up -d # 查看服务状态 docker-compose ps # 输出示例: # NAME IMAGE STATUS # rocketmq-namesrv apache/rocketmq:5.1.3 Up 30 seconds # rocketmq-broker apache/rocketmq:5.1.3 Up 20 seconds # rocketmq-dashboard apacherocketmq/rocketmq-dashboard:latest Up 10 seconds # 查看日志 docker-compose logs -f broker 三、发送第一条消息 3.1 创建 Java 项目 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rocketmq-demo</artifactId> <version>1.0.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- RocketMQ 客户端 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.5</version> </dependency> <!-- 日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.11</version> </dependency> </dependencies> </project> 3.2 编写生产者代码 // QuickProducer.java package com.example.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class QuickProducer { public static void main(String[] args) throws Exception { // 1. 创建生产者,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("quick_producer_group"); // 2. 指定 NameServer 地址 producer.setNamesrvAddr("localhost:9876"); // 3. 启动生产者 producer.start(); System.out.println("Producer Started."); // 4. 发送消息 for (int i = 0; i < 10; i++) { // 创建消息对象 Message msg = new Message( "QuickStartTopic", // Topic "TagA", // Tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 发送消息 SendResult sendResult = producer.send(msg); // 打印发送结果 System.out.printf("%s%n", sendResult); Thread.sleep(1000); } // 5. 关闭生产者 producer.shutdown(); System.out.println("Producer Shutdown."); } } 3.3 编写消费者代码 // QuickConsumer.java package com.example.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class QuickConsumer { public static void main(String[] args) throws Exception { // 1. 创建消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quick_consumer_group"); // 2. 指定 NameServer 地址 consumer.setNamesrvAddr("localhost:9876"); // 3. 订阅 Topic 和 Tag consumer.subscribe("QuickStartTopic", "*"); // 4. 设置消费模式(默认集群模式) consumer.setMessageModel(MessageModel.CLUSTERING); // 5. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.printf("Consume message: %s%n", new String(message.getBody())); } // 返回消费状态:成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者 consumer.start(); System.out.println("Consumer Started."); // 保持运行 Thread.sleep(Long.MAX_VALUE); } } 四、运行测试 4.1 运行步骤 # 1. 先启动消费者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickConsumer" # 输出: # Consumer Started. # 等待消息... # 2. 启动生产者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickProducer" # 输出: # Producer Started. # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F0000000000000000, ... # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F00000000000000B8, ... # ... # Producer Shutdown. # 3. 查看消费者输出 # Consume message: Hello RocketMQ 0 # Consume message: Hello RocketMQ 1 # ... 4.2 使用控制台查看 打开浏览器访问:http://localhost:8080 ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...