1. Kafka消息顺序消费概述

Kafka消息顺序消费是分布式消息系统的重要特性,在高并发场景下保证消息的有序处理对于业务逻辑的正确性至关重要。Kafka通过分区机制和消费者组来实现消息的顺序消费,但需要合理的设计和配置才能在高并发场景下保证顺序性。系统具备分区策略、消费者组管理、顺序消费策略、性能优化、监控告警等功能。本文将详细介绍Kafka消息顺序消费的原理、实现方法、性能优化技巧以及在运维实战中的应用。

1.1 Kafka消息顺序消费核心价值

  1. 业务逻辑正确性: 保证消息处理顺序符合业务逻辑
  2. 数据一致性: 确保数据处理的时序一致性
  3. 状态机处理: 支持基于状态机的顺序处理
  4. 事务处理: 保证事务操作的顺序性
  5. 事件溯源: 支持事件溯源模式的有序处理

1.2 Kafka消息顺序消费场景

  • 订单处理: 订单状态变更的顺序处理
  • 库存管理: 库存变动的顺序处理
  • 用户行为: 用户行为事件的顺序分析
  • 金融交易: 金融交易记录的顺序处理
  • 日志处理: 日志事件的顺序处理

1.3 Kafka顺序消费技术特性

  • 分区内有序: 同一分区内消息有序
  • 分区键策略: 通过分区键保证相关消息有序
  • 消费者组: 通过消费者组管理消费进度
  • 偏移量管理: 通过偏移量保证消费顺序
  • 重平衡机制: 支持消费者重平衡

2. Kafka消息顺序消费基础实现

2.1 Kafka消息顺序消费配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Kafka消息顺序消费配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(KafkaOrderConsumptionProperties.class)
public class KafkaOrderConsumptionConfig {

@Autowired
private KafkaOrderConsumptionProperties properties;

/**
* Kafka消息顺序消费服务
* @return Kafka消息顺序消费服务
*/
@Bean
public KafkaOrderConsumptionService kafkaOrderConsumptionService() {
return new KafkaOrderConsumptionService();
}

/**
* Kafka分区策略服务
* @return Kafka分区策略服务
*/
@Bean
public KafkaPartitionStrategyService kafkaPartitionStrategyService() {
return new KafkaPartitionStrategyService();
}

/**
* Kafka消费者组管理服务
* @return Kafka消费者组管理服务
*/
@Bean
public KafkaConsumerGroupService kafkaConsumerGroupService() {
return new KafkaConsumerGroupService();
}

/**
* Kafka顺序消费策略服务
* @return Kafka顺序消费策略服务
*/
@Bean
public KafkaOrderConsumptionStrategyService kafkaOrderConsumptionStrategyService() {
return new KafkaOrderConsumptionStrategyService();
}

/**
* Kafka顺序消费监控服务
* @return Kafka顺序消费监控服务
*/
@Bean
public KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService() {
return new KafkaOrderConsumptionMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumptionConfig.class);
}

2.2 Kafka消息顺序消费属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/**
* Kafka消息顺序消费属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "kafka.order-consumption")
public class KafkaOrderConsumptionProperties {

/**
* 是否启用Kafka消息顺序消费
*/
private boolean enableOrderConsumption = true;

/**
* Kafka集群地址
*/
private List<String> bootstrapServers = Arrays.asList("localhost:9092");

/**
* 主题名称
*/
private String topicName = "order-consumption-topic";

/**
* 消费者组ID
*/
private String consumerGroupId = "order-consumption-group";

/**
* 分区数量
*/
private int partitionCount = 3;

/**
* 副本数量
*/
private int replicationFactor = 1;

/**
* 分区策略
*/
private String partitionStrategy = "KEY_BASED"; // KEY_BASED, ROUND_ROBIN, CUSTOM

/**
* 顺序消费策略
*/
private String orderConsumptionStrategy = "PARTITION_ORDER"; // PARTITION_ORDER, GLOBAL_ORDER, CUSTOM_ORDER

/**
* 是否启用自动提交
*/
private boolean enableAutoCommit = false;

/**
* 自动提交间隔(毫秒)
*/
private long autoCommitInterval = 1000;

/**
* 最大拉取记录数
*/
private int maxPollRecords = 500;

/**
* 会话超时时间(毫秒)
*/
private long sessionTimeoutMs = 30000;

/**
* 心跳间隔(毫秒)
*/
private long heartbeatIntervalMs = 3000;

/**
* 拉取超时时间(毫秒)
*/
private long fetchMaxWaitMs = 500;

/**
* 最小拉取字节数
*/
private int fetchMinBytes = 1;

/**
* 最大拉取字节数
*/
private int fetchMaxBytes = 52428800;

/**
* 是否启用顺序保证
*/
private boolean enableOrderGuarantee = true;

/**
* 顺序保证级别
*/
private String orderGuaranteeLevel = "STRICT"; // STRICT, RELAXED, NONE

/**
* 顺序检查间隔(毫秒)
*/
private long orderCheckInterval = 1000;

/**
* 顺序检查超时(毫秒)
*/
private long orderCheckTimeout = 5000;

/**
* 是否启用重试机制
*/
private boolean enableRetry = true;

/**
* 最大重试次数
*/
private int maxRetryCount = 3;

/**
* 重试间隔(毫秒)
*/
private long retryInterval = 1000;

/**
* 是否启用死信队列
*/
private boolean enableDeadLetterQueue = true;

/**
* 死信队列主题
*/
private String deadLetterTopic = "order-consumption-dlq";

/**
* 是否启用监控
*/
private boolean enableMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用告警
*/
private boolean enableAlert = true;

/**
* 消费延迟告警阈值(毫秒)
*/
private long consumptionDelayAlertThreshold = 10000;

/**
* 顺序消费失败告警阈值
*/
private int orderConsumptionFailureAlertThreshold = 5;

/**
* 分区不平衡告警阈值
*/
private double partitionImbalanceAlertThreshold = 0.2;
}

2.3 顺序消息数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/**
* 顺序消息数据模型类
* @author 运维实战
*/
@Data
public class OrderMessage {

/**
* 消息ID
*/
private String messageId;

/**
* 业务键(用于分区)
*/
private String businessKey;

/**
* 消息类型
*/
private String messageType;

/**
* 消息内容
*/
private String messageContent;

/**
* 消息顺序号
*/
private Long sequenceNumber;

/**
* 时间戳
*/
private Long timestamp;

/**
* 分区键
*/
private String partitionKey;

/**
* 消息优先级
*/
private Integer priority;

/**
* 消息状态
*/
private String messageStatus;

/**
* 重试次数
*/
private Integer retryCount;

/**
* 原始数据
*/
private Map<String, Object> rawData;

/**
* 扩展属性
*/
private Map<String, Object> extendedProperties;

public OrderMessage() {
this.messageId = UUID.randomUUID().toString();
this.timestamp = System.currentTimeMillis();
this.messageStatus = "PENDING";
this.retryCount = 0;
this.priority = 0;
this.rawData = new HashMap<>();
this.extendedProperties = new HashMap<>();
}

public OrderMessage(String businessKey, String messageType, String messageContent) {
this();
this.businessKey = businessKey;
this.messageType = messageType;
this.messageContent = messageContent;
this.partitionKey = generatePartitionKey();
}

/**
* 验证顺序消息
* @return 是否有效
*/
public boolean validate() {
if (messageId == null || messageId.isEmpty()) {
return false;
}

if (businessKey == null || businessKey.isEmpty()) {
return false;
}

if (messageType == null || messageType.isEmpty()) {
return false;
}

if (messageContent == null || messageContent.isEmpty()) {
return false;
}

if (timestamp == null || timestamp <= 0) {
return false;
}

return true;
}

/**
* 生成分区键
* @return 分区键
*/
public String generatePartitionKey() {
if (businessKey != null) {
return businessKey;
}
return "default";
}

/**
* 计算消息优先级
* @return 消息优先级
*/
public int calculatePriority() {
int priority = 0;

// 根据消息类型调整优先级
if ("URGENT".equals(messageType)) {
priority += 100;
} else if ("HIGH".equals(messageType)) {
priority += 50;
} else if ("NORMAL".equals(messageType)) {
priority += 0;
} else if ("LOW".equals(messageType)) {
priority -= 50;
}

// 根据重试次数调整优先级
if (retryCount != null) {
priority -= retryCount * 10;
}

return priority;
}

/**
* 添加原始数据
* @param key 键
* @param value 值
*/
public void addRawData(String key, Object value) {
if (rawData == null) {
rawData = new HashMap<>();
}
rawData.put(key, value);
}

/**
* 添加扩展属性
* @param key 键
* @param value 值
*/
public void addExtendedProperty(String key, Object value) {
if (extendedProperties == null) {
extendedProperties = new HashMap<>();
}
extendedProperties.put(key, value);
}

/**
* 获取扩展属性
* @param key 键
* @return
*/
public Object getExtendedProperty(String key) {
return extendedProperties != null ? extendedProperties.get(key) : null;
}
}

/**
* 顺序消费结果类
* @author 运维实战
*/
@Data
public class OrderConsumptionResult {

private boolean success;
private String messageId;
private String businessKey;
private String messageType;
private Long sequenceNumber;
private String error;
private long startTime;
private long endTime;

public OrderConsumptionResult() {
this.success = false;
}

public OrderConsumptionResult(String messageId, String businessKey, String messageType) {
this();
this.messageId = messageId;
this.businessKey = businessKey;
this.messageType = messageType;
}

/**
* 获取消费耗时
* @return 消费耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* 顺序消费批次结果类
* @author 运维实战
*/
@Data
public class OrderConsumptionBatchResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private List<OrderConsumptionResult> results;
private String error;
private long startTime;
private long endTime;

public OrderConsumptionBatchResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
this.results = new ArrayList<>();
}

/**
* 获取消费耗时
* @return 消费耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

2.4 基础Kafka消息顺序消费服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/**
* 基础Kafka消息顺序消费服务
* @author 运维实战
*/
@Service
public class KafkaOrderConsumptionService {

@Autowired
private KafkaOrderConsumptionProperties properties;

@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;

@Autowired
private KafkaPartitionStrategyService kafkaPartitionStrategyService;

@Autowired
private KafkaConsumerGroupService kafkaConsumerGroupService;

@Autowired
private KafkaOrderConsumptionStrategyService kafkaOrderConsumptionStrategyService;

@Autowired
private KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumptionService.class);

/**
* 发送顺序消息
* @param orderMessage 顺序消息
* @return 发送结果
*/
public KafkaOrderSendResult sendOrderMessage(OrderMessage orderMessage) {
logger.info("发送顺序消息,消息ID: {}, 业务键: {}", orderMessage.getMessageId(), orderMessage.getBusinessKey());

KafkaOrderSendResult result = new KafkaOrderSendResult();
result.setMessageId(orderMessage.getMessageId());
result.setBusinessKey(orderMessage.getBusinessKey());
result.setStartTime(System.currentTimeMillis());

try {
// 验证顺序消息
if (!orderMessage.validate()) {
result.setSuccess(false);
result.setError("顺序消息验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 生成分区键
String partitionKey = orderMessage.generatePartitionKey();
orderMessage.setPartitionKey(partitionKey);

// 发送消息
ListenableFuture<SendResult<String, OrderMessage>> future = kafkaTemplate.send(
properties.getTopicName(), partitionKey, orderMessage);

// 添加回调
future.addCallback(
result -> {
logger.debug("顺序消息发送成功,消息ID: {}, 分区: {}, 偏移量: {}",
orderMessage.getMessageId(), result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
},
failure -> {
logger.error("顺序消息发送失败,消息ID: {}", orderMessage.getMessageId(), failure);
}
);

// 等待发送完成
SendResult<String, OrderMessage> sendResult = future.get(5, TimeUnit.SECONDS);

result.setSuccess(true);
result.setPartition(sendResult.getRecordMetadata().partition());
result.setOffset(sendResult.getRecordMetadata().offset());
result.setEndTime(System.currentTimeMillis());

// 记录发送成功指标
kafkaOrderConsumptionMonitorService.recordOrderMessageSend(orderMessage.getBusinessKey(), true);

logger.info("顺序消息发送成功,消息ID: {}, 分区: {}, 偏移量: {}, 耗时: {}ms",
orderMessage.getMessageId(), result.getPartition(), result.getOffset(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("顺序消息发送异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("顺序消息发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录发送失败指标
kafkaOrderConsumptionMonitorService.recordOrderMessageSend(orderMessage.getBusinessKey(), false);

return result;
}
}

/**
* 批量发送顺序消息
* @param orderMessages 顺序消息列表
* @return 批量发送结果
*/
public KafkaOrderBatchSendResult batchSendOrderMessages(List<OrderMessage> orderMessages) {
logger.info("批量发送顺序消息,数量: {}", orderMessages.size());

KafkaOrderBatchSendResult result = new KafkaOrderBatchSendResult();
result.setTotalCount(orderMessages.size());
result.setStartTime(System.currentTimeMillis());

try {
// 验证和预处理消息
List<OrderMessage> validMessages = new ArrayList<>();
List<OrderMessage> invalidMessages = new ArrayList<>();

for (OrderMessage orderMessage : orderMessages) {
if (orderMessage.validate()) {
// 生成分区键
String partitionKey = orderMessage.generatePartitionKey();
orderMessage.setPartitionKey(partitionKey);
validMessages.add(orderMessage);
} else {
invalidMessages.add(orderMessage);
logger.warn("顺序消息验证失败,消息ID: {}", orderMessage.getMessageId());
}
}

// 批量发送有效消息
if (!validMessages.isEmpty()) {
KafkaOrderBatchSendResult sendResult = executeBatchSend(validMessages);

result.setSuccessCount(sendResult.getSuccessCount());
result.setFailureCount(sendResult.getFailureCount() + invalidMessages.size());
} else {
result.setFailureCount(orderMessages.size());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量发送指标
kafkaOrderConsumptionMonitorService.recordOrderMessageBatchSend(orderMessages.size(), result.getSuccessCount());

logger.info("批量顺序消息发送完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量顺序消息发送异常", e);
result.setSuccess(false);
result.setError("批量顺序消息发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 消费顺序消息
* @param orderMessage 顺序消息
* @return 消费结果
*/
public OrderConsumptionResult consumeOrderMessage(OrderMessage orderMessage) {
logger.info("消费顺序消息,消息ID: {}, 业务键: {}", orderMessage.getMessageId(), orderMessage.getBusinessKey());

OrderConsumptionResult result = new OrderConsumptionResult(
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getMessageType());
result.setSequenceNumber(orderMessage.getSequenceNumber());
result.setStartTime(System.currentTimeMillis());

try {
// 验证顺序消息
if (!orderMessage.validate()) {
result.setSuccess(false);
result.setError("顺序消息验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 根据顺序消费策略处理消息
OrderConsumptionResult consumptionResult = kafkaOrderConsumptionStrategyService.processOrderMessage(orderMessage);

if (consumptionResult != null && consumptionResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

// 记录消费成功指标
kafkaOrderConsumptionMonitorService.recordOrderMessageConsume(orderMessage.getBusinessKey(), true);

logger.info("顺序消息消费成功,消息ID: {}, 业务键: {}, 耗时: {}ms",
orderMessage.getMessageId(), orderMessage.getBusinessKey(), result.getDuration());
} else {
result.setSuccess(false);
result.setError(consumptionResult != null ? consumptionResult.getError() : "消费失败");
result.setEndTime(System.currentTimeMillis());

// 记录消费失败指标
kafkaOrderConsumptionMonitorService.recordOrderMessageConsume(orderMessage.getBusinessKey(), false);
}

return result;

} catch (Exception e) {
logger.error("顺序消息消费异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("顺序消息消费异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录消费异常指标
kafkaOrderConsumptionMonitorService.recordOrderMessageConsume(orderMessage.getBusinessKey(), false);

return result;
}
}

/**
* 批量消费顺序消息
* @param orderMessages 顺序消息列表
* @return 批量消费结果
*/
public OrderConsumptionBatchResult batchConsumeOrderMessages(List<OrderMessage> orderMessages) {
logger.info("批量消费顺序消息,数量: {}", orderMessages.size());

OrderConsumptionBatchResult result = new OrderConsumptionBatchResult();
result.setTotalCount(orderMessages.size());
result.setStartTime(System.currentTimeMillis());

try {
// 验证和预处理消息
List<OrderMessage> validMessages = new ArrayList<>();
List<OrderMessage> invalidMessages = new ArrayList<>();

for (OrderMessage orderMessage : orderMessages) {
if (orderMessage.validate()) {
validMessages.add(orderMessage);
} else {
invalidMessages.add(orderMessage);
logger.warn("顺序消息验证失败,消息ID: {}", orderMessage.getMessageId());
}
}

// 批量消费有效消息
if (!validMessages.isEmpty()) {
OrderConsumptionBatchResult consumptionResult = kafkaOrderConsumptionStrategyService.batchProcessOrderMessages(validMessages);

result.setSuccessCount(consumptionResult.getSuccessCount());
result.setFailureCount(consumptionResult.getFailureCount() + invalidMessages.size());
result.setResults(consumptionResult.getResults());
} else {
result.setFailureCount(orderMessages.size());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量消费指标
kafkaOrderConsumptionMonitorService.recordOrderMessageBatchConsume(orderMessages.size(), result.getSuccessCount());

logger.info("批量顺序消息消费完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量顺序消息消费异常", e);
result.setSuccess(false);
result.setError("批量顺序消息消费异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行批量发送
* @param orderMessages 顺序消息列表
* @return 批量发送结果
*/
private KafkaOrderBatchSendResult executeBatchSend(List<OrderMessage> orderMessages) {
KafkaOrderBatchSendResult result = new KafkaOrderBatchSendResult();
result.setTotalCount(orderMessages.size());
result.setStartTime(System.currentTimeMillis());

try {
int successCount = 0;
int failureCount = 0;

for (OrderMessage orderMessage : orderMessages) {
try {
KafkaOrderSendResult sendResult = sendOrderMessage(orderMessage);
if (sendResult.isSuccess()) {
successCount++;
} else {
failureCount++;
}
} catch (Exception e) {
logger.error("批量发送单条消息异常,消息ID: {}", orderMessage.getMessageId(), e);
failureCount++;
}
}

result.setSuccessCount(successCount);
result.setFailureCount(failureCount);
result.setEndTime(System.currentTimeMillis());

return result;

} catch (Exception e) {
logger.error("执行批量发送异常", e);
result.setSuccess(false);
result.setError("执行批量发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

2.5 Kafka消息顺序消费结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Kafka顺序发送结果类
* @author 运维实战
*/
@Data
public class KafkaOrderSendResult {

private boolean success;
private String messageId;
private String businessKey;
private int partition;
private long offset;
private String error;
private long startTime;
private long endTime;

public KafkaOrderSendResult() {
this.success = false;
}

/**
* 获取发送耗时
* @return 发送耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Kafka顺序批量发送结果类
* @author 运维实战
*/
@Data
public class KafkaOrderBatchSendResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public KafkaOrderBatchSendResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取发送耗时
* @return 发送耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

3. 高级功能实现

3.1 Kafka分区策略服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* Kafka分区策略服务
* @author 运维实战
*/
@Service
public class KafkaPartitionStrategyService {

@Autowired
private KafkaOrderConsumptionProperties properties;

@Autowired
private KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaPartitionStrategyService.class);

/**
* 计算分区
* @param partitionKey 分区键
* @return 分区号
*/
public int calculatePartition(String partitionKey) {
logger.debug("计算分区,分区键: {}", partitionKey);

try {
int partition = 0;

switch (properties.getPartitionStrategy().toUpperCase()) {
case "KEY_BASED":
partition = calculateKeyBasedPartition(partitionKey);
break;
case "ROUND_ROBIN":
partition = calculateRoundRobinPartition();
break;
case "CUSTOM":
partition = calculateCustomPartition(partitionKey);
break;
default:
partition = calculateKeyBasedPartition(partitionKey);
break;
}

logger.debug("分区计算完成,分区键: {}, 分区号: {}", partitionKey, partition);

return partition;

} catch (Exception e) {
logger.error("分区计算异常,分区键: {}", partitionKey, e);
return 0;
}
}

/**
* 基于键的分区计算
* @param partitionKey 分区键
* @return 分区号
*/
private int calculateKeyBasedPartition(String partitionKey) {
if (partitionKey == null || partitionKey.isEmpty()) {
return 0;
}

// 使用哈希算法计算分区
int hashCode = partitionKey.hashCode();
return Math.abs(hashCode) % properties.getPartitionCount();
}

/**
* 轮询分区计算
* @return 分区号
*/
private int calculateRoundRobinPartition() {
// 使用线程安全的计数器实现轮询
return (int) (System.currentTimeMillis() % properties.getPartitionCount());
}

/**
* 自定义分区计算
* @param partitionKey 分区键
* @return 分区号
*/
private int calculateCustomPartition(String partitionKey) {
// 实现自定义分区逻辑
if (partitionKey == null || partitionKey.isEmpty()) {
return 0;
}

// 根据业务规则计算分区
if (partitionKey.startsWith("ORDER_")) {
return 0;
} else if (partitionKey.startsWith("PAYMENT_")) {
return 1;
} else if (partitionKey.startsWith("INVENTORY_")) {
return 2;
} else {
return calculateKeyBasedPartition(partitionKey);
}
}

/**
* 验证分区策略
* @param partitionStrategy 分区策略
* @return 是否有效
*/
public boolean validatePartitionStrategy(String partitionStrategy) {
return "KEY_BASED".equals(partitionStrategy) ||
"ROUND_ROBIN".equals(partitionStrategy) ||
"CUSTOM".equals(partitionStrategy);
}

/**
* 获取分区信息
* @return 分区信息
*/
public Map<String, Object> getPartitionInfo() {
Map<String, Object> info = new HashMap<>();
info.put("partitionStrategy", properties.getPartitionStrategy());
info.put("partitionCount", properties.getPartitionCount());
info.put("replicationFactor", properties.getReplicationFactor());
info.put("topicName", properties.getTopicName());
return info;
}
}

3.2 Kafka消费者组管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
/**
* Kafka消费者组管理服务
* @author 运维实战
*/
@Service
public class KafkaConsumerGroupService {

@Autowired
private KafkaOrderConsumptionProperties properties;

@Autowired
private KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService;

private final Map<String, ConsumerGroupInfo> consumerGroupMap = new ConcurrentHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerGroupService.class);

/**
* 注册消费者组
* @param consumerGroupId 消费者组ID
* @param consumerId 消费者ID
* @return 注册结果
*/
public ConsumerGroupRegistrationResult registerConsumerGroup(String consumerGroupId, String consumerId) {
logger.info("注册消费者组,消费者组ID: {}, 消费者ID: {}", consumerGroupId, consumerId);

ConsumerGroupRegistrationResult result = new ConsumerGroupRegistrationResult();
result.setConsumerGroupId(consumerGroupId);
result.setConsumerId(consumerId);
result.setStartTime(System.currentTimeMillis());

try {
// 获取或创建消费者组信息
ConsumerGroupInfo consumerGroupInfo = consumerGroupMap.computeIfAbsent(consumerGroupId,
k -> new ConsumerGroupInfo(consumerGroupId));

// 添加消费者
ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
consumerGroupInfo.addConsumer(consumerInfo);

result.setSuccess(true);
result.setConsumerGroupInfo(consumerGroupInfo);
result.setEndTime(System.currentTimeMillis());

logger.info("消费者组注册成功,消费者组ID: {}, 消费者ID: {}, 耗时: {}ms",
consumerGroupId, consumerId, result.getDuration());

return result;

} catch (Exception e) {
logger.error("消费者组注册异常,消费者组ID: {}, 消费者ID: {}", consumerGroupId, consumerId, e);
result.setSuccess(false);
result.setError("消费者组注册异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 注销消费者组
* @param consumerGroupId 消费者组ID
* @param consumerId 消费者ID
* @return 注销结果
*/
public ConsumerGroupUnregistrationResult unregisterConsumerGroup(String consumerGroupId, String consumerId) {
logger.info("注销消费者组,消费者组ID: {}, 消费者ID: {}", consumerGroupId, consumerId);

ConsumerGroupUnregistrationResult result = new ConsumerGroupUnregistrationResult();
result.setConsumerGroupId(consumerGroupId);
result.setConsumerId(consumerId);
result.setStartTime(System.currentTimeMillis());

try {
ConsumerGroupInfo consumerGroupInfo = consumerGroupMap.get(consumerGroupId);
if (consumerGroupInfo != null) {
consumerGroupInfo.removeConsumer(consumerId);

// 如果消费者组为空,则移除
if (consumerGroupInfo.getConsumers().isEmpty()) {
consumerGroupMap.remove(consumerGroupId);
}
}

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("消费者组注销成功,消费者组ID: {}, 消费者ID: {}, 耗时: {}ms",
consumerGroupId, consumerId, result.getDuration());

return result;

} catch (Exception e) {
logger.error("消费者组注销异常,消费者组ID: {}, 消费者ID: {}", consumerGroupId, consumerId, e);
result.setSuccess(false);
result.setError("消费者组注销异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 获取消费者组信息
* @param consumerGroupId 消费者组ID
* @return 消费者组信息
*/
public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroupId) {
try {
return consumerGroupMap.get(consumerGroupId);
} catch (Exception e) {
logger.error("获取消费者组信息异常,消费者组ID: {}", consumerGroupId, e);
return null;
}
}

/**
* 获取所有消费者组信息
* @return 所有消费者组信息
*/
public List<ConsumerGroupInfo> getAllConsumerGroupInfo() {
try {
return new ArrayList<>(consumerGroupMap.values());
} catch (Exception e) {
logger.error("获取所有消费者组信息异常", e);
return new ArrayList<>();
}
}

/**
* 更新消费者组状态
* @param consumerGroupId 消费者组ID
* @param status 状态
*/
public void updateConsumerGroupStatus(String consumerGroupId, String status) {
try {
ConsumerGroupInfo consumerGroupInfo = consumerGroupMap.get(consumerGroupId);
if (consumerGroupInfo != null) {
consumerGroupInfo.setStatus(status);
consumerGroupInfo.setLastUpdateTime(System.currentTimeMillis());
}
} catch (Exception e) {
logger.error("更新消费者组状态异常,消费者组ID: {}", consumerGroupId, e);
}
}

/**
* 检查消费者组健康状态
* @param consumerGroupId 消费者组ID
* @return 健康状态
*/
public ConsumerGroupHealthStatus checkConsumerGroupHealth(String consumerGroupId) {
logger.info("检查消费者组健康状态,消费者组ID: {}", consumerGroupId);

ConsumerGroupHealthStatus healthStatus = new ConsumerGroupHealthStatus();
healthStatus.setConsumerGroupId(consumerGroupId);
healthStatus.setStartTime(System.currentTimeMillis());

try {
ConsumerGroupInfo consumerGroupInfo = consumerGroupMap.get(consumerGroupId);
if (consumerGroupInfo != null) {
// 检查消费者数量
int consumerCount = consumerGroupInfo.getConsumers().size();
healthStatus.setConsumerCount(consumerCount);

// 检查消费者状态
long activeConsumerCount = consumerGroupInfo.getConsumers().values().stream()
.filter(consumer -> "ACTIVE".equals(consumer.getStatus()))
.count();
healthStatus.setActiveConsumerCount((int) activeConsumerCount);

// 计算健康分数
double healthScore = calculateHealthScore(consumerCount, activeConsumerCount);
healthStatus.setHealthScore(healthScore);

// 确定健康状态
if (healthScore >= 90) {
healthStatus.setHealthStatus("HEALTHY");
} else if (healthScore >= 70) {
healthStatus.setHealthStatus("WARNING");
} else if (healthScore >= 50) {
healthStatus.setHealthStatus("UNHEALTHY");
} else {
healthStatus.setHealthStatus("CRITICAL");
}
} else {
healthStatus.setHealthStatus("NOT_FOUND");
healthStatus.setHealthScore(0.0);
}

healthStatus.setEndTime(System.currentTimeMillis());

logger.info("消费者组健康状态检查完成,消费者组ID: {}, 健康状态: {}, 健康分数: {}",
consumerGroupId, healthStatus.getHealthStatus(), healthStatus.getHealthScore());

return healthStatus;

} catch (Exception e) {
logger.error("检查消费者组健康状态异常,消费者组ID: {}", consumerGroupId, e);
healthStatus.setHealthStatus("ERROR");
healthStatus.setHealthScore(0.0);
healthStatus.setEndTime(System.currentTimeMillis());
return healthStatus;
}
}

/**
* 计算健康分数
* @param consumerCount 消费者数量
* @param activeConsumerCount 活跃消费者数量
* @return 健康分数
*/
private double calculateHealthScore(int consumerCount, long activeConsumerCount) {
if (consumerCount == 0) {
return 0.0;
}

double activeRatio = (double) activeConsumerCount / consumerCount;
return activeRatio * 100.0;
}
}

/**
* 消费者组信息类
* @author 运维实战
*/
@Data
public class ConsumerGroupInfo {

private String consumerGroupId;
private String status;
private Long createTime;
private Long lastUpdateTime;
private Map<String, ConsumerInfo> consumers;
private Map<String, Object> properties;

public ConsumerGroupInfo() {
this.status = "ACTIVE";
this.createTime = System.currentTimeMillis();
this.lastUpdateTime = System.currentTimeMillis();
this.consumers = new ConcurrentHashMap<>();
this.properties = new HashMap<>();
}

public ConsumerGroupInfo(String consumerGroupId) {
this();
this.consumerGroupId = consumerGroupId;
}

/**
* 添加消费者
* @param consumerInfo 消费者信息
*/
public void addConsumer(ConsumerInfo consumerInfo) {
consumers.put(consumerInfo.getConsumerId(), consumerInfo);
lastUpdateTime = System.currentTimeMillis();
}

/**
* 移除消费者
* @param consumerId 消费者ID
*/
public void removeConsumer(String consumerId) {
consumers.remove(consumerId);
lastUpdateTime = System.currentTimeMillis();
}

/**
* 获取消费者数量
* @return 消费者数量
*/
public int getConsumerCount() {
return consumers.size();
}
}

/**
* 消费者信息类
* @author 运维实战
*/
@Data
public class ConsumerInfo {

private String consumerId;
private String status;
private Long createTime;
private Long lastUpdateTime;
private Map<String, Object> properties;

public ConsumerInfo() {
this.status = "ACTIVE";
this.createTime = System.currentTimeMillis();
this.lastUpdateTime = System.currentTimeMillis();
this.properties = new HashMap<>();
}

public ConsumerInfo(String consumerId) {
this();
this.consumerId = consumerId;
}
}

3.3 Kafka顺序消费策略服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
/**
* Kafka顺序消费策略服务
* @author 运维实战
*/
@Service
public class KafkaOrderConsumptionStrategyService {

@Autowired
private KafkaOrderConsumptionProperties properties;

@Autowired
private KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService;

private final Map<String, Long> lastProcessedSequence = new ConcurrentHashMap<>();
private final Map<String, Queue<OrderMessage>> messageQueues = new ConcurrentHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumptionStrategyService.class);

/**
* 处理顺序消息
* @param orderMessage 顺序消息
* @return 处理结果
*/
public OrderConsumptionResult processOrderMessage(OrderMessage orderMessage) {
logger.info("处理顺序消息,消息ID: {}, 业务键: {}, 顺序号: {}",
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getSequenceNumber());

OrderConsumptionResult result = new OrderConsumptionResult(
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getMessageType());
result.setSequenceNumber(orderMessage.getSequenceNumber());
result.setStartTime(System.currentTimeMillis());

try {
// 验证顺序消息
if (!orderMessage.validate()) {
result.setSuccess(false);
result.setError("顺序消息验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 根据顺序消费策略处理消息
OrderConsumptionResult processResult = null;
switch (properties.getOrderConsumptionStrategy().toUpperCase()) {
case "PARTITION_ORDER":
processResult = processPartitionOrderMessage(orderMessage);
break;
case "GLOBAL_ORDER":
processResult = processGlobalOrderMessage(orderMessage);
break;
case "CUSTOM_ORDER":
processResult = processCustomOrderMessage(orderMessage);
break;
default:
processResult = processPartitionOrderMessage(orderMessage);
break;
}

if (processResult != null && processResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("顺序消息处理成功,消息ID: {}, 业务键: {}, 耗时: {}ms",
orderMessage.getMessageId(), orderMessage.getBusinessKey(), result.getDuration());
} else {
result.setSuccess(false);
result.setError(processResult != null ? processResult.getError() : "处理失败");
result.setEndTime(System.currentTimeMillis());
}

return result;

} catch (Exception e) {
logger.error("顺序消息处理异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("顺序消息处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量处理顺序消息
* @param orderMessages 顺序消息列表
* @return 批量处理结果
*/
public OrderConsumptionBatchResult batchProcessOrderMessages(List<OrderMessage> orderMessages) {
logger.info("批量处理顺序消息,数量: {}", orderMessages.size());

OrderConsumptionBatchResult result = new OrderConsumptionBatchResult();
result.setTotalCount(orderMessages.size());
result.setStartTime(System.currentTimeMillis());

try {
// 按业务键分组
Map<String, List<OrderMessage>> groupedMessages = orderMessages.stream()
.collect(Collectors.groupingBy(OrderMessage::getBusinessKey));

List<OrderConsumptionResult> results = new ArrayList<>();
int successCount = 0;
int failureCount = 0;

// 处理每个业务键的消息
for (Map.Entry<String, List<OrderMessage>> entry : groupedMessages.entrySet()) {
String businessKey = entry.getKey();
List<OrderMessage> messages = entry.getValue();

// 按顺序号排序
messages.sort(Comparator.comparing(OrderMessage::getSequenceNumber));

// 处理排序后的消息
for (OrderMessage orderMessage : messages) {
try {
OrderConsumptionResult processResult = processOrderMessage(orderMessage);
results.add(processResult);

if (processResult.isSuccess()) {
successCount++;
} else {
failureCount++;
}
} catch (Exception e) {
logger.error("批量处理单条消息异常,消息ID: {}", orderMessage.getMessageId(), e);
failureCount++;
}
}
}

result.setSuccessCount(successCount);
result.setFailureCount(failureCount);
result.setResults(results);
result.setEndTime(System.currentTimeMillis());

logger.info("批量顺序消息处理完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量顺序消息处理异常", e);
result.setSuccess(false);
result.setError("批量顺序消息处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 处理分区顺序消息
* @param orderMessage 顺序消息
* @return 处理结果
*/
private OrderConsumptionResult processPartitionOrderMessage(OrderMessage orderMessage) {
logger.info("处理分区顺序消息,消息ID: {}, 业务键: {}", orderMessage.getMessageId(), orderMessage.getBusinessKey());

OrderConsumptionResult result = new OrderConsumptionResult(
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getMessageType());
result.setSequenceNumber(orderMessage.getSequenceNumber());
result.setStartTime(System.currentTimeMillis());

try {
String businessKey = orderMessage.getBusinessKey();
Long sequenceNumber = orderMessage.getSequenceNumber();

// 检查顺序
if (sequenceNumber != null) {
Long lastSequence = lastProcessedSequence.get(businessKey);
if (lastSequence != null && sequenceNumber <= lastSequence) {
result.setSuccess(false);
result.setError("消息顺序号无效");
result.setEndTime(System.currentTimeMillis());
return result;
}
}

// 执行业务逻辑
boolean success = executeBusinessLogic(orderMessage);

if (success) {
// 更新最后处理的顺序号
if (sequenceNumber != null) {
lastProcessedSequence.put(businessKey, sequenceNumber);
}

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("分区顺序消息处理成功,消息ID: {}, 业务键: {}, 耗时: {}ms",
orderMessage.getMessageId(), businessKey, result.getDuration());
} else {
result.setSuccess(false);
result.setError("业务逻辑执行失败");
result.setEndTime(System.currentTimeMillis());
}

return result;

} catch (Exception e) {
logger.error("分区顺序消息处理异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("分区顺序消息处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 处理全局顺序消息
* @param orderMessage 顺序消息
* @return 处理结果
*/
private OrderConsumptionResult processGlobalOrderMessage(OrderMessage orderMessage) {
logger.info("处理全局顺序消息,消息ID: {}, 业务键: {}", orderMessage.getMessageId(), orderMessage.getBusinessKey());

OrderConsumptionResult result = new OrderConsumptionResult(
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getMessageType());
result.setSequenceNumber(orderMessage.getSequenceNumber());
result.setStartTime(System.currentTimeMillis());

try {
String businessKey = orderMessage.getBusinessKey();
Long sequenceNumber = orderMessage.getSequenceNumber();

// 获取消息队列
Queue<OrderMessage> messageQueue = messageQueues.computeIfAbsent(businessKey, k -> new ConcurrentLinkedQueue<>());

// 添加消息到队列
messageQueue.offer(orderMessage);

// 处理队列中的消息
OrderConsumptionResult processResult = processMessageQueue(businessKey, messageQueue);

if (processResult != null && processResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("全局顺序消息处理成功,消息ID: {}, 业务键: {}, 耗时: {}ms",
orderMessage.getMessageId(), businessKey, result.getDuration());
} else {
result.setSuccess(false);
result.setError(processResult != null ? processResult.getError() : "处理失败");
result.setEndTime(System.currentTimeMillis());
}

return result;

} catch (Exception e) {
logger.error("全局顺序消息处理异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("全局顺序消息处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 处理自定义顺序消息
* @param orderMessage 顺序消息
* @return 处理结果
*/
private OrderConsumptionResult processCustomOrderMessage(OrderMessage orderMessage) {
logger.info("处理自定义顺序消息,消息ID: {}, 业务键: {}", orderMessage.getMessageId(), orderMessage.getBusinessKey());

OrderConsumptionResult result = new OrderConsumptionResult(
orderMessage.getMessageId(), orderMessage.getBusinessKey(), orderMessage.getMessageType());
result.setSequenceNumber(orderMessage.getSequenceNumber());
result.setStartTime(System.currentTimeMillis());

try {
// 实现自定义顺序处理逻辑
boolean success = executeCustomBusinessLogic(orderMessage);

if (success) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("自定义顺序消息处理成功,消息ID: {}, 业务键: {}, 耗时: {}ms",
orderMessage.getMessageId(), orderMessage.getBusinessKey(), result.getDuration());
} else {
result.setSuccess(false);
result.setError("自定义业务逻辑执行失败");
result.setEndTime(System.currentTimeMillis());
}

return result;

} catch (Exception e) {
logger.error("自定义顺序消息处理异常,消息ID: {}", orderMessage.getMessageId(), e);
result.setSuccess(false);
result.setError("自定义顺序消息处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 处理消息队列
* @param businessKey 业务键
* @param messageQueue 消息队列
* @return 处理结果
*/
private OrderConsumptionResult processMessageQueue(String businessKey, Queue<OrderMessage> messageQueue) {
try {
while (!messageQueue.isEmpty()) {
OrderMessage message = messageQueue.peek();
if (message == null) {
break;
}

// 检查顺序
Long sequenceNumber = message.getSequenceNumber();
if (sequenceNumber != null) {
Long lastSequence = lastProcessedSequence.get(businessKey);
if (lastSequence != null && sequenceNumber <= lastSequence) {
messageQueue.poll();
continue;
}
}

// 处理消息
boolean success = executeBusinessLogic(message);
if (success) {
messageQueue.poll();
if (sequenceNumber != null) {
lastProcessedSequence.put(businessKey, sequenceNumber);
}
} else {
break;
}
}

return new OrderConsumptionResult("QUEUE_PROCESS", businessKey, "QUEUE");

} catch (Exception e) {
logger.error("处理消息队列异常,业务键: {}", businessKey, e);
return null;
}
}

/**
* 执行业务逻辑
* @param orderMessage 顺序消息
* @return 是否成功
*/
private boolean executeBusinessLogic(OrderMessage orderMessage) {
try {
// 模拟业务逻辑处理
Thread.sleep(100);

// 根据消息类型执行不同的业务逻辑
switch (orderMessage.getMessageType()) {
case "ORDER_CREATE":
return processOrderCreate(orderMessage);
case "ORDER_UPDATE":
return processOrderUpdate(orderMessage);
case "ORDER_CANCEL":
return processOrderCancel(orderMessage);
default:
return true;
}

} catch (Exception e) {
logger.error("执行业务逻辑异常,消息ID: {}", orderMessage.getMessageId(), e);
return false;
}
}

/**
* 执行自定义业务逻辑
* @param orderMessage 顺序消息
* @return 是否成功
*/
private boolean executeCustomBusinessLogic(OrderMessage orderMessage) {
try {
// 实现自定义业务逻辑
// 这里可以添加具体的业务处理逻辑

return true;

} catch (Exception e) {
logger.error("执行自定义业务逻辑异常,消息ID: {}", orderMessage.getMessageId(), e);
return false;
}
}

/**
* 处理订单创建
* @param orderMessage 顺序消息
* @return 是否成功
*/
private boolean processOrderCreate(OrderMessage orderMessage) {
logger.info("处理订单创建,消息ID: {}", orderMessage.getMessageId());
return true;
}

/**
* 处理订单更新
* @param orderMessage 顺序消息
* @return 是否成功
*/
private boolean processOrderUpdate(OrderMessage orderMessage) {
logger.info("处理订单更新,消息ID: {}", orderMessage.getMessageId());
return true;
}

/**
* 处理订单取消
* @param orderMessage 顺序消息
* @return 是否成功
*/
private boolean processOrderCancel(OrderMessage orderMessage) {
logger.info("处理订单取消,消息ID: {}", orderMessage.getMessageId());
return true;
}
}

3.4 Kafka顺序消费监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
* Kafka顺序消费监控服务
* @author 运维实战
*/
@Service
public class KafkaOrderConsumptionMonitorService {

private final AtomicLong totalOrderMessagesSent = new AtomicLong(0);
private final AtomicLong totalOrderMessagesConsumed = new AtomicLong(0);
private final AtomicLong totalOrderMessagesSentSuccessfully = new AtomicLong(0);
private final AtomicLong totalOrderMessagesSentFailed = new AtomicLong(0);
private final AtomicLong totalOrderMessagesConsumedSuccessfully = new AtomicLong(0);
private final AtomicLong totalOrderMessagesConsumedFailed = new AtomicLong(0);
private final AtomicLong totalBatchSends = new AtomicLong(0);
private final AtomicLong totalBatchConsumes = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumptionMonitorService.class);

/**
* 记录顺序消息发送
* @param businessKey 业务键
* @param success 是否成功
*/
public void recordOrderMessageSend(String businessKey, boolean success) {
totalOrderMessagesSent.incrementAndGet();

if (success) {
totalOrderMessagesSentSuccessfully.incrementAndGet();
} else {
totalOrderMessagesSentFailed.incrementAndGet();
}

logger.debug("记录顺序消息发送: 业务键={}, 成功={}", businessKey, success);
}

/**
* 记录顺序消息消费
* @param businessKey 业务键
* @param success 是否成功
*/
public void recordOrderMessageConsume(String businessKey, boolean success) {
totalOrderMessagesConsumed.incrementAndGet();

if (success) {
totalOrderMessagesConsumedSuccessfully.incrementAndGet();
} else {
totalOrderMessagesConsumedFailed.incrementAndGet();
}

logger.debug("记录顺序消息消费: 业务键={}, 成功={}", businessKey, success);
}

/**
* 记录顺序消息批量发送
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordOrderMessageBatchSend(int totalCount, int successCount) {
totalBatchSends.addAndGet(totalCount);
totalOrderMessagesSent.addAndGet(totalCount);
totalOrderMessagesSentSuccessfully.addAndGet(successCount);
totalOrderMessagesSentFailed.addAndGet(totalCount - successCount);

logger.debug("记录顺序消息批量发送: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 记录顺序消息批量消费
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordOrderMessageBatchConsume(int totalCount, int successCount) {
totalBatchConsumes.addAndGet(totalCount);
totalOrderMessagesConsumed.addAndGet(totalCount);
totalOrderMessagesConsumedSuccessfully.addAndGet(successCount);
totalOrderMessagesConsumedFailed.addAndGet(totalCount - successCount);

logger.debug("记录顺序消息批量消费: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 获取监控指标
* @return 监控指标
*/
public KafkaOrderConsumptionMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

KafkaOrderConsumptionMetrics metrics = new KafkaOrderConsumptionMetrics();
metrics.setTotalOrderMessagesSent(totalOrderMessagesSent.get());
metrics.setTotalOrderMessagesConsumed(totalOrderMessagesConsumed.get());
metrics.setTotalOrderMessagesSentSuccessfully(totalOrderMessagesSentSuccessfully.get());
metrics.setTotalOrderMessagesSentFailed(totalOrderMessagesSentFailed.get());
metrics.setTotalOrderMessagesConsumedSuccessfully(totalOrderMessagesConsumedSuccessfully.get());
metrics.setTotalOrderMessagesConsumedFailed(totalOrderMessagesConsumedFailed.get());
metrics.setTotalBatchSends(totalBatchSends.get());
metrics.setTotalBatchConsumes(totalBatchConsumes.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalOrderMessagesSent.set(0);
totalOrderMessagesConsumed.set(0);
totalOrderMessagesSentSuccessfully.set(0);
totalOrderMessagesSentFailed.set(0);
totalOrderMessagesConsumedSuccessfully.set(0);
totalOrderMessagesConsumedFailed.set(0);
totalBatchSends.set(0);
totalBatchConsumes.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("Kafka顺序消费监控指标重置");
}

/**
* 定期监控Kafka顺序消费状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorKafkaOrderConsumptionStatus() {
try {
KafkaOrderConsumptionMetrics metrics = getMetrics();

logger.info("Kafka顺序消费监控: 发送={}, 消费={}, 发送成功={}, 发送失败={}, 消费成功={}, 消费失败={}, 批量发送={}, 批量消费={}, 发送成功率={}%, 消费成功率={}%",
metrics.getTotalOrderMessagesSent(), metrics.getTotalOrderMessagesConsumed(),
metrics.getTotalOrderMessagesSentSuccessfully(), metrics.getTotalOrderMessagesSentFailed(),
metrics.getTotalOrderMessagesConsumedSuccessfully(), metrics.getTotalOrderMessagesConsumedFailed(),
metrics.getTotalBatchSends(), metrics.getTotalBatchConsumes(),
String.format("%.2f", metrics.getSendSuccessRate()),
String.format("%.2f", metrics.getConsumeSuccessRate()));

// 检查异常情况
if (metrics.getSendSuccessRate() < 95) {
logger.warn("Kafka顺序消息发送成功率过低: {}%", String.format("%.2f", metrics.getSendSuccessRate()));
}

if (metrics.getConsumeSuccessRate() < 90) {
logger.warn("Kafka顺序消息消费成功率过低: {}%", String.format("%.2f", metrics.getConsumeSuccessRate()));
}

} catch (Exception e) {
logger.error("Kafka顺序消费状态监控失败", e);
}
}
}

3.5 Kafka顺序消费指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* Kafka顺序消费指标类
* @author 运维实战
*/
@Data
public class KafkaOrderConsumptionMetrics {

private long totalOrderMessagesSent;
private long totalOrderMessagesConsumed;
private long totalOrderMessagesSentSuccessfully;
private long totalOrderMessagesSentFailed;
private long totalOrderMessagesConsumedSuccessfully;
private long totalOrderMessagesConsumedFailed;
private long totalBatchSends;
private long totalBatchConsumes;
private long timestamp;

public KafkaOrderConsumptionMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取发送成功率
* @return 发送成功率
*/
public double getSendSuccessRate() {
long total = totalOrderMessagesSentSuccessfully + totalOrderMessagesSentFailed;
if (total == 0) return 0.0;
return (double) totalOrderMessagesSentSuccessfully / total * 100;
}

/**
* 获取发送失败率
* @return 发送失败率
*/
public double getSendFailureRate() {
long total = totalOrderMessagesSentSuccessfully + totalOrderMessagesSentFailed;
if (total == 0) return 0.0;
return (double) totalOrderMessagesSentFailed / total * 100;
}

/**
* 获取消费成功率
* @return 消费成功率
*/
public double getConsumeSuccessRate() {
long total = totalOrderMessagesConsumedSuccessfully + totalOrderMessagesConsumedFailed;
if (total == 0) return 0.0;
return (double) totalOrderMessagesConsumedSuccessfully / total * 100;
}

/**
* 获取消费失败率
* @return 消费失败率
*/
public double getConsumeFailureRate() {
long total = totalOrderMessagesConsumedSuccessfully + totalOrderMessagesConsumedFailed;
if (total == 0) return 0.0;
return (double) totalOrderMessagesConsumedFailed / total * 100;
}

/**
* 获取发送效率
* @return 发送效率
*/
public double getSendEfficiency() {
if (totalOrderMessagesSent == 0) return 0.0;
return (double) totalOrderMessagesSentSuccessfully / totalOrderMessagesSent * 100;
}

/**
* 获取消费效率
* @return 消费效率
*/
public double getConsumeEfficiency() {
if (totalOrderMessagesConsumed == 0) return 0.0;
return (double) totalOrderMessagesConsumedSuccessfully / totalOrderMessagesConsumed * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getSendSuccessRate() > 95 &&
getConsumeSuccessRate() > 90;
}
}

4. Kafka顺序消费控制器

4.1 Kafka顺序消费REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* Kafka顺序消费REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/kafka/order-consumption")
public class KafkaOrderConsumptionController {

@Autowired
private KafkaOrderConsumptionService kafkaOrderConsumptionService;

@Autowired
private KafkaOrderConsumptionMonitorService kafkaOrderConsumptionMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumptionController.class);

/**
* 发送顺序消息
* @param orderMessage 顺序消息
* @return 发送结果
*/
@PostMapping("/send")
public ResponseEntity<KafkaOrderSendResult> sendOrderMessage(@RequestBody OrderMessage orderMessage) {
try {
logger.info("接收到顺序消息发送请求,消息ID: {}", orderMessage.getMessageId());

KafkaOrderSendResult result = kafkaOrderConsumptionService.sendOrderMessage(orderMessage);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("顺序消息发送失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量发送顺序消息
* @param orderMessages 顺序消息列表
* @return 批量发送结果
*/
@PostMapping("/batch-send")
public ResponseEntity<KafkaOrderBatchSendResult> batchSendOrderMessages(@RequestBody List<OrderMessage> orderMessages) {
try {
logger.info("接收到顺序消息批量发送请求,数量: {}", orderMessages.size());

KafkaOrderBatchSendResult result = kafkaOrderConsumptionService.batchSendOrderMessages(orderMessages);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("顺序消息批量发送失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 消费顺序消息
* @param orderMessage 顺序消息
* @return 消费结果
*/
@PostMapping("/consume")
public ResponseEntity<OrderConsumptionResult> consumeOrderMessage(@RequestBody OrderMessage orderMessage) {
try {
logger.info("接收到顺序消息消费请求,消息ID: {}", orderMessage.getMessageId());

OrderConsumptionResult result = kafkaOrderConsumptionService.consumeOrderMessage(orderMessage);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("顺序消息消费失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量消费顺序消息
* @param orderMessages 顺序消息列表
* @return 批量消费结果
*/
@PostMapping("/batch-consume")
public ResponseEntity<OrderConsumptionBatchResult> batchConsumeOrderMessages(@RequestBody List<OrderMessage> orderMessages) {
try {
logger.info("接收到顺序消息批量消费请求,数量: {}", orderMessages.size());

OrderConsumptionBatchResult result = kafkaOrderConsumptionService.batchConsumeOrderMessages(orderMessages);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("顺序消息批量消费失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取Kafka顺序消费监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<KafkaOrderConsumptionMetrics> getKafkaOrderConsumptionMetrics() {
try {
KafkaOrderConsumptionMetrics metrics = kafkaOrderConsumptionMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取Kafka顺序消费监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

5. 总结

5.1 Kafka消息顺序消费最佳实践

  1. 合理设计分区策略: 通过分区键保证相关消息在同一分区内有序
  2. 优化消费者组配置: 合理配置消费者组参数保证顺序消费
  3. 实现顺序消费策略: 根据业务需求选择合适的顺序消费策略
  4. 异常处理: 实现完善的异常处理和重试机制
  5. 性能监控: 实时监控Kafka顺序消费性能

5.2 性能优化建议

  • 分区优化: 合理设置分区数量和分区策略
  • 消费者优化: 优化消费者配置和并发处理
  • 批量处理: 使用批量发送和消费提高性能
  • 缓存策略: 实现合理的数据缓存策略
  • 错误处理: 完善错误处理和重试机制

5.3 运维管理要点

  • 实时监控: 监控Kafka顺序消费状态和性能
  • 分区管理: 建立完善的分区管理和监控机制
  • 消费者组管理: 实现消费者组的注册、发现、管理
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化Kafka性能

通过本文的高并发场景下如何保证Kafka消息顺序消费Java实战指南,您可以掌握Kafka消息顺序消费的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、可靠的Kafka顺序消费系统!