引言:5分钟上手 RocketMQ
不需要复杂的环境配置,不需要编译源码,通过 Docker,我们可以在 5 分钟内搭建一个完整的 RocketMQ 环境,并成功发送第一条消息。
让我们开始这段激动人心的旅程!
一、环境准备
1.1 前置要求
# 检查 Docker 版本(需要 20.10+)
docker --version
# Docker version 24.0.5, build ced0996
# 检查 Docker Compose 版本(需要 2.0+)
docker-compose --version
# Docker Compose version v2.20.2
# 检查系统资源
# 建议:4GB+ 内存,10GB+ 磁盘空间
docker system info | grep -E "Memory|Storage"
1.2 创建项目目录
# 创建项目目录
mkdir -p ~/rocketmq-demo
cd ~/rocketmq-demo
# 创建必要的子目录
mkdir -p data/namesrv/logs data/broker/logs data/broker/store
mkdir -p conf
二、Docker Compose 部署
2.1 编写 docker-compose.yml
# docker-compose.yml
version: '3.8'
services:
# NameServer 服务
namesrv:
image: apache/rocketmq:5.1.3
container_name: rocketmq-namesrv
ports:
- "9876:9876"
environment:
JAVA_OPT: "-Duser.home=/opt"
JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
volumes:
- ./data/namesrv/logs:/opt/logs
command: sh mqnamesrv
networks:
- rocketmq-net
# Broker 服务
broker:
image: apache/rocketmq:5.1.3
container_name: rocketmq-broker
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"
environment:
JAVA_OPT_EXT: "-Xms1G -Xmx1G -Xmn512m"
volumes:
- ./data/broker/logs:/opt/logs
- ./data/broker/store:/opt/store
- ./conf/broker.conf:/opt/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c /opt/conf/broker.conf
depends_on:
- namesrv
networks:
- rocketmq-net
# 控制台服务
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
ports:
- "8080:8080"
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- namesrv
- broker
networks:
- rocketmq-net
networks:
rocketmq-net:
driver: bridge
2.2 配置 Broker
# conf/broker.conf
cat > conf/broker.conf << 'EOF'
# 集群名称
brokerClusterName = DefaultCluster
# broker 名称
brokerName = broker-a
# 0 表示 Master,>0 表示 Slave
brokerId = 0
# 删除文件时间点,默认凌晨 4 点
deleteWhen = 04
# 文件保留时间,默认 48 小时
fileReservedTime = 48
# Broker 的角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir = /opt/store
# 队列存储路径
storePathCommitLog = /opt/store/commitlog
# 自动创建 Topic
autoCreateTopicEnable = true
# 自动创建订阅组
autoCreateSubscriptionGroup = true
# Broker 监听端口
listenPort = 10911
# 是否允许 Broker 自动创建 Topic
brokerIP1 = broker
EOF
2.3 启动服务
# 启动所有服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 输出示例:
# NAME IMAGE STATUS
# rocketmq-namesrv apache/rocketmq:5.1.3 Up 30 seconds
# rocketmq-broker apache/rocketmq:5.1.3 Up 20 seconds
# rocketmq-dashboard apacherocketmq/rocketmq-dashboard:latest Up 10 seconds
# 查看日志
docker-compose logs -f broker
三、发送第一条消息
3.1 创建 Java 项目
<!-- pom.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- RocketMQ 客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
</dependencies>
</project>
3.2 编写生产者代码
// QuickProducer.java
package com.example.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class QuickProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("quick_producer_group");
// 2. 指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3. 启动生产者
producer.start();
System.out.println("Producer Started.");
// 4. 发送消息
for (int i = 0; i < 10; i++) {
// 创建消息对象
Message msg = new Message(
"QuickStartTopic", // Topic
"TagA", // Tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
Thread.sleep(1000);
}
// 5. 关闭生产者
producer.shutdown();
System.out.println("Producer Shutdown.");
}
}
3.3 编写消费者代码
// QuickConsumer.java
package com.example.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class QuickConsumer {
public static void main(String[] args) throws Exception {
// 1. 创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quick_consumer_group");
// 2. 指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 3. 订阅 Topic 和 Tag
consumer.subscribe("QuickStartTopic", "*");
// 4. 设置消费模式(默认集群模式)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 5. 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.printf("Consume message: %s%n",
new String(message.getBody()));
}
// 返回消费状态:成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 6. 启动消费者
consumer.start();
System.out.println("Consumer Started.");
// 保持运行
Thread.sleep(Long.MAX_VALUE);
}
}
四、运行测试
4.1 运行步骤
# 1. 先启动消费者(新终端)
mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickConsumer"
# 输出:
# Consumer Started.
# 等待消息...
# 2. 启动生产者(新终端)
mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickProducer"
# 输出:
# Producer Started.
# SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F0000000000000000, ...
# SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F00000000000000B8, ...
# ...
# Producer Shutdown.
# 3. 查看消费者输出
# Consume message: Hello RocketMQ 0
# Consume message: Hello RocketMQ 1
# ...
4.2 使用控制台查看
打开浏览器访问:http://localhost:8080
控制台功能:
├── 集群状态
│ ├── Broker 列表
│ └── NameServer 状态
├── Topic 管理
│ ├── Topic 列表
│ ├── 消息查询
│ └── 消息轨迹
├── 消费者管理
│ ├── 消费组列表
│ ├── 消费进度
│ └── 消费者详情
└── 生产者管理
└── 生产者列表
五、进阶实践
5.1 发送不同类型的消息
public class AdvancedProducer {
// 1. 同步发送
public void syncSend() throws Exception {
Message msg = new Message("TopicTest", "TagA", "OrderID001",
"Hello World".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 2. 异步发送
public void asyncSend() throws Exception {
Message msg = new Message("TopicTest", "TagA", "OrderID002",
"Hello World".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功:%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送失败:%s%n", e);
}
});
}
// 3. 单向发送(不关心发送结果)
public void onewaySend() throws Exception {
Message msg = new Message("TopicTest", "TagA", "OrderID003",
"Hello World".getBytes());
producer.sendOneway(msg);
}
// 4. 批量发送
public void batchSend() throws Exception {
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
messages.add(new Message("BatchTopic", "TagA",
("Hello " + i).getBytes()));
}
SendResult sendResult = producer.send(messages);
System.out.printf("批量发送结果:%s%n", sendResult);
}
}
5.2 延迟消息
public void sendDelayMessage() throws Exception {
Message msg = new Message("DelayTopic", "TagA",
"Delay Message".getBytes());
// 设置延迟级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 10 秒后投递
SendResult sendResult = producer.send(msg);
System.out.printf("延迟消息发送:%s%n", sendResult);
}
5.3 顺序消息
// 顺序消息生产者
public void sendOrderlyMessage() throws Exception {
for (int orderId = 0; orderId < 10; orderId++) {
for (String status : Arrays.asList("创建", "付款", "发货", "收货")) {
Message msg = new Message("OrderTopic", "Order",
(orderId + ":" + status).getBytes());
// 同一订单的消息发送到同一个队列
SendResult sendResult = producer.send(msg,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("OrderId: %d, Status: %s, SendResult: %s%n",
orderId, status, sendResult);
}
}
}
六、常见问题排查
6.1 连接失败
# 问题:connect to <localhost:9876> failed
# 解决:检查 NameServer 是否启动
docker ps | grep namesrv
# 检查端口
netstat -an | grep 9876
# 查看防火墙
sudo iptables -L -n | grep 9876
6.2 消息发送失败
// 问题:No route info of this topic
// 解决:设置自动创建 Topic
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
// 或手动创建 Topic
sh mqadmin updateTopic -n localhost:9876 -t QuickStartTopic -c DefaultCluster
6.3 消费不到消息
// 检查点:
// 1. Topic 是否正确
consumer.subscribe("QuickStartTopic", "*");
// 2. 消费组是否正确
consumer.setConsumerGroup("quick_consumer_group");
// 3. 消费位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4. 检查消费进度
sh mqadmin consumerProgress -n localhost:9876 -g quick_consumer_group
6.4 性能优化
# 优化 docker-compose.yml
broker:
environment:
# 增加内存
JAVA_OPT_EXT: "-Xms2G -Xmx2G -Xmn1G"
# 增加 ulimits
ulimits:
nofile:
soft: 65535
hard: 65535
七、清理环境
# 停止所有服务
docker-compose down
# 清理数据(慎用)
docker-compose down -v
rm -rf data/
# 清理镜像(可选)
docker rmi apache/rocketmq:5.1.3
docker rmi apacherocketmq/rocketmq-dashboard:latest
八、总结
通过本篇教程,我们完成了:
✅ 使用 Docker Compose 一键部署 RocketMQ ✅ 编写生产者发送消息 ✅ 编写消费者接收消息 ✅ 使用控制台管理和监控 ✅ 尝试不同类型的消息发送 ✅ 学习常见问题排查方法
这只是 RocketMQ 旅程的开始,接下来我们将深入学习 RocketMQ 的核心概念和高级特性。
下一篇预告
成功发送了第一条消息后,下一篇我们将深入理解 RocketMQ 的核心概念:Producer、Consumer、Broker 的工作原理,为后续的深入学习打下基础。
动手练习:
- 尝试修改消息内容,发送 JSON 格式的消息
- 尝试创建多个消费者,观察负载均衡效果
- 尝试停止 Broker 后重启,观察消息是否丢失
记录你的实验结果,我们下篇见!