1. RocketMQ的架构概述

RocketMQ是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高可用、低延迟的特点。RocketMQ采用分布式架构设计,包含NameServer、Broker、Producer、Consumer四个核心组件,支持消息顺序、事务消息、延时消息、批量消息等特性。系统具备消息存储、负载均衡、高可用、性能优化、监控告警等功能。本文将详细介绍RocketMQ架构的原理、实现方法、性能优化技巧以及在运维实战中的应用。

1.1 RocketMQ架构核心价值

  1. 高性能: 支持高并发、低延迟的消息处理
  2. 高可靠: 保证消息不丢失、不重复
  3. 高可用: 支持集群部署和故障转移
  4. 可扩展: 支持水平扩展和垂直扩展
  5. 易运维: 提供完善的监控和管理工具

1.2 RocketMQ架构场景

  • 异步解耦: 系统间异步通信和解耦
  • 削峰填谷: 流量削峰和系统保护
  • 数据同步: 数据同步和一致性保证
  • 事件驱动: 基于事件的系统架构
  • 日志收集: 分布式日志收集和处理

1.3 RocketMQ架构特性

  • NameServer: 服务注册与发现
  • Broker: 消息存储和转发
  • Producer: 消息生产者
  • Consumer: 消息消费者
  • 消息存储: 基于CommitLog的消息存储

2. RocketMQ架构基础实现

2.1 RocketMQ架构配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* RocketMQ架构配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(RocketMQArchitectureProperties.class)
public class RocketMQArchitectureConfig {

@Autowired
private RocketMQArchitectureProperties properties;

/**
* RocketMQ架构服务
* @return RocketMQ架构服务
*/
@Bean
public RocketMQArchitectureService rocketMQArchitectureService() {
return new RocketMQArchitectureService();
}

/**
* NameServer服务
* @return NameServer服务
*/
@Bean
public NameServerService nameServerService() {
return new NameServerService();
}

/**
* Broker服务
* @return Broker服务
*/
@Bean
public BrokerService brokerService() {
return new BrokerService();
}

/**
* Producer服务
* @return Producer服务
*/
@Bean
public ProducerService producerService() {
return new ProducerService();
}

/**
* Consumer服务
* @return Consumer服务
*/
@Bean
public ConsumerService consumerService() {
return new ConsumerService();
}

/**
* RocketMQ架构监控服务
* @return RocketMQ架构监控服务
*/
@Bean
public RocketMQArchitectureMonitorService rocketMQArchitectureMonitorService() {
return new RocketMQArchitectureMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(RocketMQArchitectureConfig.class);
}

2.2 RocketMQ架构属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/**
* RocketMQ架构属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "rocketmq.architecture")
public class RocketMQArchitectureProperties {

/**
* 是否启用RocketMQ架构
*/
private boolean enableRocketMQArchitecture = true;

/**
* NameServer地址列表
*/
private List<String> nameServerAddresses = Arrays.asList("localhost:9876");

/**
* 生产者配置
*/
private ProducerConfig producer = new ProducerConfig();

/**
* 消费者配置
*/
private ConsumerConfig consumer = new ConsumerConfig();

/**
* Broker配置
*/
private BrokerConfig broker = new BrokerConfig();

/**
* 消息存储配置
*/
private MessageStoreConfig messageStore = new MessageStoreConfig();

/**
* 是否启用高可用
*/
private boolean enableHighAvailability = true;

/**
* 是否启用负载均衡
*/
private boolean enableLoadBalancing = true;

/**
* 是否启用消息顺序
*/
private boolean enableMessageOrdering = true;

/**
* 是否启用事务消息
*/
private boolean enableTransactionMessage = true;

/**
* 是否启用延时消息
*/
private boolean enableDelayMessage = true;

/**
* 是否启用批量消息
*/
private boolean enableBatchMessage = true;

/**
* 是否启用监控
*/
private boolean enableMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用告警
*/
private boolean enableAlert = true;

/**
* 消息积压告警阈值
*/
private long messageBacklogAlertThreshold = 10000;

/**
* 消费延迟告警阈值(毫秒)
*/
private long consumptionDelayAlertThreshold = 10000;

/**
* 生产者配置类
*/
@Data
public static class ProducerConfig {
/**
* 生产者组名
*/
private String producerGroup = "default-producer-group";

/**
* 发送超时时间(毫秒)
*/
private long sendTimeout = 3000;

/**
* 重试次数
*/
private int retryTimes = 2;

/**
* 是否启用压缩
*/
private boolean enableCompression = true;

/**
* 压缩级别
*/
private int compressionLevel = 5;

/**
* 最大消息大小(字节)
*/
private int maxMessageSize = 4194304; // 4MB

/**
* 是否启用VIP通道
*/
private boolean enableVipChannel = false;
}

/**
* 消费者配置类
*/
@Data
public static class ConsumerConfig {
/**
* 消费者组名
*/
private String consumerGroup = "default-consumer-group";

/**
* 消费模式
*/
private String consumeMode = "CLUSTERING"; // CLUSTERING, BROADCASTING

/**
* 消费起始位置
*/
private String consumeFromWhere = "CONSUME_FROM_LAST_OFFSET"; // CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET

/**
* 批量消费大小
*/
private int consumeMessageBatchMaxSize = 1;

/**
* 消费超时时间(毫秒)
*/
private long consumeTimeout = 15000;

/**
* 最大重试次数
*/
private int maxReconsumeTimes = 16;

/**
* 是否启用顺序消费
*/
private boolean enableOrderlyConsume = false;
}

/**
* Broker配置类
*/
@Data
public static class BrokerConfig {
/**
* Broker名称
*/
private String brokerName = "default-broker";

/**
* Broker ID
*/
private int brokerId = 0;

/**
* Broker地址
*/
private String brokerAddress = "localhost:10911";

/**
* 是否启用主从复制
*/
private boolean enableSlaveSync = true;

/**
* 主从同步超时时间(毫秒)
*/
private long slaveSyncTimeout = 5000;

/**
* 是否启用快速失败
*/
private boolean enableFastFail = true;
}

/**
* 消息存储配置类
*/
@Data
public static class MessageStoreConfig {
/**
* 消息存储路径
*/
private String storePathRootDir = "/tmp/rocketmq/store";

/**
* CommitLog存储路径
*/
private String storePathCommitLog = "/tmp/rocketmq/store/commitlog";

/**
* ConsumeQueue存储路径
*/
private String storePathConsumeQueue = "/tmp/rocketmq/store/consumequeue";

/**
* IndexFile存储路径
*/
private String storePathIndex = "/tmp/rocketmq/store/index";

/**
* 单个CommitLog文件大小(字节)
*/
private int mappedFileSizeCommitLog = 1073741824; // 1GB

/**
* 单个ConsumeQueue文件大小(字节)
*/
private int mappedFileSizeConsumeQueue = 6000000; // 6MB

/**
* 刷盘策略
*/
private String flushDiskType = "ASYNC_FLUSH"; // ASYNC_FLUSH, SYNC_FLUSH

/**
* 刷盘间隔(毫秒)
*/
private long flushIntervalCommitLog = 500;

/**
* 是否启用压缩
*/
private boolean enableCompaction = true;

/**
* 压缩阈值
*/
private int compactionThreshold = 0;
}
}

2.3 RocketMQ消息数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/**
* RocketMQ消息数据模型类
* @author 运维实战
*/
@Data
public class RocketMQMessage {

/**
* 消息ID
*/
private String messageId;

/**
* 主题
*/
private String topic;

/**
* 标签
*/
private String tags;

/**
* 消息键
*/
private String keys;

/**
* 消息体
*/
private byte[] body;

/**
* 消息属性
*/
private Map<String, String> properties;

/**
* 延时级别
*/
private int delayTimeLevel;

/**
* 消息类型
*/
private String messageType;

/**
* 消息优先级
*/
private int priority;

/**
* 消息状态
*/
private String messageStatus;

/**
* 创建时间
*/
private Long createTime;

/**
* 存储时间
*/
private Long storeTime;

/**
* 消费时间
*/
private Long consumeTime;

/**
* 重试次数
*/
private Integer retryCount;

/**
* 扩展属性
*/
private Map<String, Object> extendedProperties;

public RocketMQMessage() {
this.messageId = UUID.randomUUID().toString();
this.messageStatus = "PENDING";
this.createTime = System.currentTimeMillis();
this.retryCount = 0;
this.priority = 0;
this.delayTimeLevel = 0;
this.properties = new HashMap<>();
this.extendedProperties = new HashMap<>();
}

public RocketMQMessage(String topic, String tags, byte[] body) {
this();
this.topic = topic;
this.tags = tags;
this.body = body;
}

/**
* 验证RocketMQ消息
* @return 是否有效
*/
public boolean validate() {
if (messageId == null || messageId.isEmpty()) {
return false;
}

if (topic == null || topic.isEmpty()) {
return false;
}

if (body == null || body.length == 0) {
return false;
}

if (createTime == null || createTime <= 0) {
return false;
}

return true;
}

/**
* 计算消息大小
* @return 消息大小(字节)
*/
public int calculateMessageSize() {
int size = 0;

if (messageId != null) {
size += messageId.getBytes().length;
}

if (topic != null) {
size += topic.getBytes().length;
}

if (tags != null) {
size += tags.getBytes().length;
}

if (keys != null) {
size += keys.getBytes().length;
}

if (body != null) {
size += body.length;
}

if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
size += entry.getKey().getBytes().length;
size += entry.getValue().getBytes().length;
}
}

return size;
}

/**
* 添加消息属性
* @param key 键
* @param value 值
*/
public void addProperty(String key, String value) {
if (properties == null) {
properties = new HashMap<>();
}
properties.put(key, value);
}

/**
* 添加扩展属性
* @param key 键
* @param value 值
*/
public void addExtendedProperty(String key, Object value) {
if (extendedProperties == null) {
extendedProperties = new HashMap<>();
}
extendedProperties.put(key, value);
}

/**
* 获取扩展属性
* @param key 键
* @return
*/
public Object getExtendedProperty(String key) {
return extendedProperties != null ? extendedProperties.get(key) : null;
}
}

/**
* RocketMQ消息发送结果类
* @author 运维实战
*/
@Data
public class RocketMQSendResult {

private boolean success;
private String messageId;
private String topic;
private String tags;
private String messageQueueId;
private long queueOffset;
private String transactionId;
private String error;
private long startTime;
private long endTime;

public RocketMQSendResult() {
this.success = false;
}

public RocketMQSendResult(String messageId, String topic, String tags) {
this();
this.messageId = messageId;
this.topic = topic;
this.tags = tags;
}

/**
* 获取发送耗时
* @return 发送耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* RocketMQ消息消费结果类
* @author 运维实战
*/
@Data
public class RocketMQConsumeResult {

private boolean success;
private String messageId;
private String topic;
private String tags;
private String consumerGroup;
private String error;
private long startTime;
private long endTime;

public RocketMQConsumeResult() {
this.success = false;
}

public RocketMQConsumeResult(String messageId, String topic, String tags, String consumerGroup) {
this();
this.messageId = messageId;
this.topic = topic;
this.tags = tags;
this.consumerGroup = consumerGroup;
}

/**
* 获取消费耗时
* @return 消费耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

2.4 基础RocketMQ架构服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/**
* 基础RocketMQ架构服务
* @author 运维实战
*/
@Service
public class RocketMQArchitectureService {

@Autowired
private RocketMQArchitectureProperties properties;

@Autowired
private NameServerService nameServerService;

@Autowired
private BrokerService brokerService;

@Autowired
private ProducerService producerService;

@Autowired
private ConsumerService consumerService;

@Autowired
private RocketMQArchitectureMonitorService rocketMQArchitectureMonitorService;

private static final Logger logger = LoggerFactory.getLogger(RocketMQArchitectureService.class);

/**
* 发送消息
* @param message RocketMQ消息
* @return 发送结果
*/
public RocketMQSendResult sendMessage(RocketMQMessage message) {
logger.info("发送RocketMQ消息,消息ID: {}, 主题: {}", message.getMessageId(), message.getTopic());

RocketMQSendResult result = new RocketMQSendResult(
message.getMessageId(), message.getTopic(), message.getTags());
result.setStartTime(System.currentTimeMillis());

try {
// 验证消息
if (!message.validate()) {
result.setSuccess(false);
result.setError("消息验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 发送消息
RocketMQSendResult sendResult = producerService.sendMessage(message);

if (sendResult != null && sendResult.isSuccess()) {
result.setSuccess(true);
result.setMessageQueueId(sendResult.getMessageQueueId());
result.setQueueOffset(sendResult.getQueueOffset());
result.setTransactionId(sendResult.getTransactionId());
result.setEndTime(System.currentTimeMillis());

// 记录发送成功指标
rocketMQArchitectureMonitorService.recordMessageSend(message.getTopic(), true);

logger.info("RocketMQ消息发送成功,消息ID: {}, 主题: {}, 耗时: {}ms",
message.getMessageId(), message.getTopic(), result.getDuration());
} else {
result.setSuccess(false);
result.setError(sendResult != null ? sendResult.getError() : "发送失败");
result.setEndTime(System.currentTimeMillis());

// 记录发送失败指标
rocketMQArchitectureMonitorService.recordMessageSend(message.getTopic(), false);
}

return result;

} catch (Exception e) {
logger.error("RocketMQ消息发送异常,消息ID: {}", message.getMessageId(), e);
result.setSuccess(false);
result.setError("RocketMQ消息发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录发送异常指标
rocketMQArchitectureMonitorService.recordMessageSend(message.getTopic(), false);

return result;
}
}

/**
* 消费消息
* @param message RocketMQ消息
* @param consumerGroup 消费者组
* @return 消费结果
*/
public RocketMQConsumeResult consumeMessage(RocketMQMessage message, String consumerGroup) {
logger.info("消费RocketMQ消息,消息ID: {}, 主题: {}, 消费者组: {}",
message.getMessageId(), message.getTopic(), consumerGroup);

RocketMQConsumeResult result = new RocketMQConsumeResult(
message.getMessageId(), message.getTopic(), message.getTags(), consumerGroup);
result.setStartTime(System.currentTimeMillis());

try {
// 验证消息
if (!message.validate()) {
result.setSuccess(false);
result.setError("消息验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 消费消息
RocketMQConsumeResult consumeResult = consumerService.consumeMessage(message, consumerGroup);

if (consumeResult != null && consumeResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

// 记录消费成功指标
rocketMQArchitectureMonitorService.recordMessageConsume(message.getTopic(), consumerGroup, true);

logger.info("RocketMQ消息消费成功,消息ID: {}, 主题: {}, 消费者组: {}, 耗时: {}ms",
message.getMessageId(), message.getTopic(), consumerGroup, result.getDuration());
} else {
result.setSuccess(false);
result.setError(consumeResult != null ? consumeResult.getError() : "消费失败");
result.setEndTime(System.currentTimeMillis());

// 记录消费失败指标
rocketMQArchitectureMonitorService.recordMessageConsume(message.getTopic(), consumerGroup, false);
}

return result;

} catch (Exception e) {
logger.error("RocketMQ消息消费异常,消息ID: {}", message.getMessageId(), e);
result.setSuccess(false);
result.setError("RocketMQ消息消费异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录消费异常指标
rocketMQArchitectureMonitorService.recordMessageConsume(message.getTopic(), consumerGroup, false);

return result;
}
}

/**
* 获取NameServer信息
* @return NameServer信息
*/
public NameServerInfo getNameServerInfo() {
logger.info("获取NameServer信息");

try {
NameServerInfo nameServerInfo = nameServerService.getNameServerInfo();

if (nameServerInfo != null) {
logger.info("获取NameServer信息成功,地址: {}", nameServerInfo.getAddress());
} else {
logger.warn("NameServer信息不存在");
}

return nameServerInfo;

} catch (Exception e) {
logger.error("获取NameServer信息异常", e);
return null;
}
}

/**
* 获取Broker信息
* @return Broker信息列表
*/
public List<BrokerInfo> getBrokerInfo() {
logger.info("获取Broker信息");

try {
List<BrokerInfo> brokerInfoList = brokerService.getBrokerInfo();

logger.info("获取Broker信息成功,Broker数量: {}", brokerInfoList.size());

return brokerInfoList;

} catch (Exception e) {
logger.error("获取Broker信息异常", e);
return new ArrayList<>();
}
}

/**
* 获取主题信息
* @param topic 主题名称
* @return 主题信息
*/
public TopicInfo getTopicInfo(String topic) {
logger.info("获取主题信息,主题: {}", topic);

try {
TopicInfo topicInfo = brokerService.getTopicInfo(topic);

if (topicInfo != null) {
logger.info("获取主题信息成功,主题: {}, 队列数: {}", topic, topicInfo.getQueueCount());
} else {
logger.warn("主题信息不存在,主题: {}", topic);
}

return topicInfo;

} catch (Exception e) {
logger.error("获取主题信息异常,主题: {}", topic, e);
return null;
}
}
}

3. 高级功能实现

3.1 NameServer服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* NameServer服务
* @author 运维实战
*/
@Service
public class NameServerService {

@Autowired
private RocketMQArchitectureProperties properties;

@Autowired
private RocketMQArchitectureMonitorService rocketMQArchitectureMonitorService;

private final Map<String, NameServerInfo> nameServerMap = new ConcurrentHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(NameServerService.class);

/**
* 注册NameServer
* @param address NameServer地址
* @return 注册结果
*/
public NameServerRegistrationResult registerNameServer(String address) {
logger.info("注册NameServer,地址: {}", address);

NameServerRegistrationResult result = new NameServerRegistrationResult();
result.setAddress(address);
result.setStartTime(System.currentTimeMillis());

try {
// 检查NameServer是否已注册
if (nameServerMap.containsKey(address)) {
result.setSuccess(false);
result.setError("NameServer已存在");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 创建NameServer信息
NameServerInfo nameServerInfo = new NameServerInfo(address);
nameServerInfo.setStatus("ACTIVE");
nameServerInfo.setRegisterTime(System.currentTimeMillis());

// 存储NameServer信息
nameServerMap.put(address, nameServerInfo);

result.setSuccess(true);
result.setNameServerInfo(nameServerInfo);
result.setEndTime(System.currentTimeMillis());

logger.info("NameServer注册成功,地址: {}, 耗时: {}ms", address, result.getDuration());

return result;

} catch (Exception e) {
logger.error("NameServer注册异常,地址: {}", address, e);
result.setSuccess(false);
result.setError("NameServer注册异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 获取NameServer信息
* @return NameServer信息
*/
public NameServerInfo getNameServerInfo() {
try {
// 返回第一个可用的NameServer
for (NameServerInfo nameServerInfo : nameServerMap.values()) {
if ("ACTIVE".equals(nameServerInfo.getStatus())) {
return nameServerInfo;
}
}
return null;
} catch (Exception e) {
logger.error("获取NameServer信息异常", e);
return null;
}
}

/**
* 获取所有NameServer信息
* @return 所有NameServer信息
*/
public List<NameServerInfo> getAllNameServerInfo() {
try {
return new ArrayList<>(nameServerMap.values());
} catch (Exception e) {
logger.error("获取所有NameServer信息异常", e);
return new ArrayList<>();
}
}

/**
* 检查NameServer健康状态
* @param address NameServer地址
* @return 健康状态
*/
public NameServerHealthStatus checkNameServerHealth(String address) {
logger.info("检查NameServer健康状态,地址: {}", address);

NameServerHealthStatus healthStatus = new NameServerHealthStatus();
healthStatus.setAddress(address);
healthStatus.setStartTime(System.currentTimeMillis());

try {
NameServerInfo nameServerInfo = nameServerMap.get(address);
if (nameServerInfo != null) {
// 检查NameServer状态
if ("ACTIVE".equals(nameServerInfo.getStatus())) {
healthStatus.setHealthStatus("HEALTHY");
healthStatus.setHealthScore(100.0);
} else {
healthStatus.setHealthStatus("UNHEALTHY");
healthStatus.setHealthScore(0.0);
}
} else {
healthStatus.setHealthStatus("NOT_FOUND");
healthStatus.setHealthScore(0.0);
}

healthStatus.setEndTime(System.currentTimeMillis());

logger.info("NameServer健康状态检查完成,地址: {}, 健康状态: {}, 健康分数: {}",
address, healthStatus.getHealthStatus(), healthStatus.getHealthScore());

return healthStatus;

} catch (Exception e) {
logger.error("检查NameServer健康状态异常,地址: {}", address, e);
healthStatus.setHealthStatus("ERROR");
healthStatus.setHealthScore(0.0);
healthStatus.setEndTime(System.currentTimeMillis());
return healthStatus;
}
}
}

/**
* NameServer信息类
* @author 运维实战
*/
@Data
public class NameServerInfo {

private String address;
private String status;
private Long registerTime;
private Long lastHeartbeatTime;
private Map<String, Object> properties;

public NameServerInfo() {
this.status = "ACTIVE";
this.registerTime = System.currentTimeMillis();
this.lastHeartbeatTime = System.currentTimeMillis();
this.properties = new HashMap<>();
}

public NameServerInfo(String address) {
this();
this.address = address;
}
}

3.2 Broker服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
* Broker服务
* @author 运维实战
*/
@Service
public class BrokerService {

@Autowired
private RocketMQArchitectureProperties properties;

@Autowired
private RocketMQArchitectureMonitorService rocketMQArchitectureMonitorService;

private final Map<String, BrokerInfo> brokerMap = new ConcurrentHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(BrokerService.class);

/**
* 注册Broker
* @param brokerName Broker名称
* @param brokerId Broker ID
* @param brokerAddress Broker地址
* @return 注册结果
*/
public BrokerRegistrationResult registerBroker(String brokerName, int brokerId, String brokerAddress) {
logger.info("注册Broker,名称: {}, ID: {}, 地址: {}", brokerName, brokerId, brokerAddress);

BrokerRegistrationResult result = new BrokerRegistrationResult();
result.setBrokerName(brokerName);
result.setBrokerId(brokerId);
result.setBrokerAddress(brokerAddress);
result.setStartTime(System.currentTimeMillis());

try {
String brokerKey = brokerName + "-" + brokerId;

// 检查Broker是否已注册
if (brokerMap.containsKey(brokerKey)) {
result.setSuccess(false);
result.setError("Broker已存在");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 创建Broker信息
BrokerInfo brokerInfo = new BrokerInfo(brokerName, brokerId, brokerAddress);
brokerInfo.setStatus("ACTIVE");
brokerInfo.setRegisterTime(System.currentTimeMillis());

// 存储Broker信息
brokerMap.put(brokerKey, brokerInfo);

result.setSuccess(true);
result.setBrokerInfo(brokerInfo);
result.setEndTime(System.currentTimeMillis());

logger.info("Broker注册成功,名称: {}, ID: {}, 地址: {}, 耗时: {}ms",
brokerName, brokerId, brokerAddress, result.getDuration());

return result;

} catch (Exception e) {
logger.error("Broker注册异常,名称: {}, ID: {}", brokerName, brokerId, e);
result.setSuccess(false);
result.setError("Broker注册异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 获取Broker信息
* @return Broker信息列表
*/
public List<BrokerInfo> getBrokerInfo() {
try {
return new ArrayList<>(brokerMap.values());
} catch (Exception e) {
logger.error("获取Broker信息异常", e);
return new ArrayList<>();
}
}

/**
* 获取主题信息
* @param topic 主题名称
* @return 主题信息
*/
public TopicInfo getTopicInfo(String topic) {
logger.info("获取主题信息,主题: {}", topic);

try {
TopicInfo topicInfo = new TopicInfo(topic);
topicInfo.setQueueCount(4); // 默认4个队列
topicInfo.setCreateTime(System.currentTimeMillis());

logger.info("获取主题信息成功,主题: {}, 队列数: {}", topic, topicInfo.getQueueCount());

return topicInfo;

} catch (Exception e) {
logger.error("获取主题信息异常,主题: {}", topic, e);
return null;
}
}

/**
* 检查Broker健康状态
* @param brokerName Broker名称
* @param brokerId Broker ID
* @return 健康状态
*/
public BrokerHealthStatus checkBrokerHealth(String brokerName, int brokerId) {
logger.info("检查Broker健康状态,名称: {}, ID: {}", brokerName, brokerId);

BrokerHealthStatus healthStatus = new BrokerHealthStatus();
healthStatus.setBrokerName(brokerName);
healthStatus.setBrokerId(brokerId);
healthStatus.setStartTime(System.currentTimeMillis());

try {
String brokerKey = brokerName + "-" + brokerId;
BrokerInfo brokerInfo = brokerMap.get(brokerKey);

if (brokerInfo != null) {
// 检查Broker状态
if ("ACTIVE".equals(brokerInfo.getStatus())) {
healthStatus.setHealthStatus("HEALTHY");
healthStatus.setHealthScore(100.0);
} else {
healthStatus.setHealthStatus("UNHEALTHY");
healthStatus.setHealthScore(0.0);
}
} else {
healthStatus.setHealthStatus("NOT_FOUND");
healthStatus.setHealthScore(0.0);
}

healthStatus.setEndTime(System.currentTimeMillis());

logger.info("Broker健康状态检查完成,名称: {}, ID: {}, 健康状态: {}, 健康分数: {}",
brokerName, brokerId, healthStatus.getHealthStatus(), healthStatus.getHealthScore());

return healthStatus;

} catch (Exception e) {
logger.error("检查Broker健康状态异常,名称: {}, ID: {}", brokerName, brokerId, e);
healthStatus.setHealthStatus("ERROR");
healthStatus.setHealthScore(0.0);
healthStatus.setEndTime(System.currentTimeMillis());
return healthStatus;
}
}
}

/**
* Broker信息类
* @author 运维实战
*/
@Data
public class BrokerInfo {

private String brokerName;
private int brokerId;
private String brokerAddress;
private String status;
private Long registerTime;
private Long lastHeartbeatTime;
private Map<String, Object> properties;

public BrokerInfo() {
this.status = "ACTIVE";
this.registerTime = System.currentTimeMillis();
this.lastHeartbeatTime = System.currentTimeMillis();
this.properties = new HashMap<>();
}

public BrokerInfo(String brokerName, int brokerId, String brokerAddress) {
this();
this.brokerName = brokerName;
this.brokerId = brokerId;
this.brokerAddress = brokerAddress;
}
}

/**
* 主题信息类
* @author 运维实战
*/
@Data
public class TopicInfo {

private String topicName;
private int queueCount;
private Long createTime;
private Map<String, Object> properties;

public TopicInfo() {
this.createTime = System.currentTimeMillis();
this.properties = new HashMap<>();
}

public TopicInfo(String topicName) {
this();
this.topicName = topicName;
}
}

4. RocketMQ架构控制器

4.1 RocketMQ架构REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* RocketMQ架构REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/rocketmq/architecture")
public class RocketMQArchitectureController {

@Autowired
private RocketMQArchitectureService rocketMQArchitectureService;

@Autowired
private RocketMQArchitectureMonitorService rocketMQArchitectureMonitorService;

private static final Logger logger = LoggerFactory.getLogger(RocketMQArchitectureController.class);

/**
* 发送消息
* @param message RocketMQ消息
* @return 发送结果
*/
@PostMapping("/send")
public ResponseEntity<RocketMQSendResult> sendMessage(@RequestBody RocketMQMessage message) {
try {
logger.info("接收到RocketMQ消息发送请求,消息ID: {}", message.getMessageId());

RocketMQSendResult result = rocketMQArchitectureService.sendMessage(message);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("RocketMQ消息发送失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 消费消息
* @param message RocketMQ消息
* @param consumerGroup 消费者组
* @return 消费结果
*/
@PostMapping("/consume")
public ResponseEntity<RocketMQConsumeResult> consumeMessage(
@RequestBody RocketMQMessage message,
@RequestParam String consumerGroup) {
try {
logger.info("接收到RocketMQ消息消费请求,消息ID: {}, 消费者组: {}", message.getMessageId(), consumerGroup);

RocketMQConsumeResult result = rocketMQArchitectureService.consumeMessage(message, consumerGroup);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("RocketMQ消息消费失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取NameServer信息
* @return NameServer信息
*/
@GetMapping("/nameserver")
public ResponseEntity<NameServerInfo> getNameServerInfo() {
try {
logger.info("接收到获取NameServer信息请求");

NameServerInfo nameServerInfo = rocketMQArchitectureService.getNameServerInfo();

return ResponseEntity.ok(nameServerInfo);

} catch (Exception e) {
logger.error("获取NameServer信息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取Broker信息
* @return Broker信息列表
*/
@GetMapping("/broker")
public ResponseEntity<List<BrokerInfo>> getBrokerInfo() {
try {
logger.info("接收到获取Broker信息请求");

List<BrokerInfo> brokerInfoList = rocketMQArchitectureService.getBrokerInfo();

return ResponseEntity.ok(brokerInfoList);

} catch (Exception e) {
logger.error("获取Broker信息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取主题信息
* @param topic 主题名称
* @return 主题信息
*/
@GetMapping("/topic/{topic}")
public ResponseEntity<TopicInfo> getTopicInfo(@PathVariable String topic) {
try {
logger.info("接收到获取主题信息请求,主题: {}", topic);

TopicInfo topicInfo = rocketMQArchitectureService.getTopicInfo(topic);

return ResponseEntity.ok(topicInfo);

} catch (Exception e) {
logger.error("获取主题信息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取RocketMQ架构监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<RocketMQArchitectureMetrics> getRocketMQArchitectureMetrics() {
try {
RocketMQArchitectureMetrics metrics = rocketMQArchitectureMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取RocketMQ架构监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

5. 总结

5.1 RocketMQ架构最佳实践

  1. 合理设计主题和队列: 根据业务需求合理设计主题和队列数量
  2. 选择合适的消费模式: 根据业务场景选择合适的消费模式
  3. 优化消息存储: 合理配置消息存储参数
  4. 实现高可用: 部署多个NameServer和Broker实现高可用
  5. 监控告警: 实时监控RocketMQ架构状态和性能

5.2 性能优化建议

  • 消息存储优化: 优化CommitLog和ConsumeQueue存储
  • 网络优化: 优化网络配置和连接池
  • 内存优化: 合理配置JVM参数
  • 磁盘优化: 使用SSD和优化磁盘IO
  • 集群优化: 合理配置集群参数

5.3 运维管理要点

  • 实时监控: 监控RocketMQ架构状态和性能
  • 集群管理: 管理NameServer和Broker集群
  • 主题管理: 管理主题和队列
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化RocketMQ性能

通过本文的RocketMQ的架构Java实战指南,您可以掌握RocketMQ架构的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、可靠的RocketMQ消息中间件系统!