1. Kafka消息积压处理概述

Kafka消息积压是分布式消息系统中常见的问题,当消息生产速度超过消费速度时,就会导致消息在Kafka中积压。消息积压不仅会影响系统性能,还可能导致数据丢失、延迟增加等问题。本文将详细介绍Kafka消息积压的原因、处理策略以及在运维实战中的最佳实践。

1.1 消息积压核心问题

  1. 性能下降: 消息积压导致系统性能下降
  2. 延迟增加: 消息处理延迟显著增加
  3. 资源消耗: 大量消息占用存储和内存资源
  4. 数据丢失: 消息积压可能导致数据丢失
  5. 系统不稳定: 消息积压可能导致系统不稳定
  6. 业务影响: 影响业务正常运转

1.2 消息积压原因

  • 消费速度慢: 消费者处理能力不足
  • 生产速度过快: 生产者发送消息过快
  • 分区不均衡: 分区分配不均衡
  • 网络问题: 网络延迟或丢包
  • 资源不足: 服务器资源不足
  • 配置不当: Kafka配置参数不当

1.3 消息积压处理策略

  • 消费者优化: 提高消费者处理能力
  • 生产者调优: 优化生产者发送策略
  • 分区策略: 合理分配分区
  • 资源扩容: 增加服务器资源
  • 监控告警: 实时监控消息积压
  • 应急处理: 消息积压应急处理

2. Kafka消费者优化

2.1 Kafka消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Kafka消费者配置
* @author Java实战
*/
@Configuration
@EnableConfigurationProperties(KafkaConsumerProperties.class)
public class KafkaConsumerConfig {

@Autowired
private KafkaConsumerProperties properties;

/**
* Kafka消费者工厂
* @return 消费者工厂
*/
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// 性能优化配置
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, properties.getFetchMinBytes());
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, properties.getFetchMaxWaitMs());
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getMaxPollRecords());
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getMaxPollIntervalMs());

// 偏移量配置
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.isEnableAutoCommit());
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutoCommitIntervalMs());

// 会话配置
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getSessionTimeoutMs());
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getHeartbeatIntervalMs());

// 分区配置
configProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, properties.getPartitionAssignmentStrategy());

return new DefaultKafkaConsumerFactory<>(configProps);
}

/**
* Kafka监听器容器工厂
* @return 监听器容器工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

// 并发配置
factory.setConcurrency(properties.getConcurrency());

// 批量处理配置
factory.setBatchListener(properties.isBatchListener());

// 错误处理配置
factory.setErrorHandler(new SeekToCurrentErrorHandler());

// 手动确认配置
factory.getContainerProperties().setAckMode(properties.getAckMode());

return factory;
}

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
}

2.2 Kafka消费者属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Kafka消费者属性配置
* @author Java实战
*/
@Data
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerProperties {

/**
* Kafka服务器地址
*/
private String bootstrapServers = "localhost:9092";

/**
* 消费者组ID
*/
private String groupId = "default-group";

/**
* 最小拉取字节数
*/
private int fetchMinBytes = 1;

/**
* 最大拉取等待时间(毫秒)
*/
private int fetchMaxWaitMs = 500;

/**
* 每次拉取的最大记录数
*/
private int maxPollRecords = 500;

/**
* 最大轮询间隔时间(毫秒)
*/
private int maxPollIntervalMs = 300000;

/**
* 自动偏移量重置策略
*/
private String autoOffsetReset = "latest";

/**
* 是否启用自动提交
*/
private boolean enableAutoCommit = true;

/**
* 自动提交间隔时间(毫秒)
*/
private int autoCommitIntervalMs = 5000;

/**
* 会话超时时间(毫秒)
*/
private int sessionTimeoutMs = 30000;

/**
* 心跳间隔时间(毫秒)
*/
private int heartbeatIntervalMs = 3000;

/**
* 分区分配策略
*/
private String partitionAssignmentStrategy = "org.apache.kafka.clients.consumer.RangeAssignor";

/**
* 并发数
*/
private int concurrency = 1;

/**
* 是否启用批量监听
*/
private boolean batchListener = false;

/**
* 确认模式
*/
private String ackMode = "BATCH";

/**
* 是否启用消费者
*/
private boolean enable = true;

/**
* 消费者线程数
*/
private int threadCount = 1;

/**
* 是否启用压缩
*/
private boolean enableCompression = false;

/**
* 压缩类型
*/
private String compressionType = "gzip";
}

2.3 Kafka消费者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
* Kafka消费者服务
* @author Java实战
*/
@Service
@Slf4j
public class KafkaConsumerService {

@Autowired
private KafkaConsumerProperties properties;

@Autowired
private MessageProcessingService messageProcessingService;

@Autowired
private KafkaBacklogMonitorService backlogMonitorService;

private final Map<String, KafkaConsumer<String, Object>> consumers = new ConcurrentHashMap<>();

/**
* 启动消费者
* @param topic 主题
* @param partition 分区
*/
public void startConsumer(String topic, Integer partition) {
log.info("启动Kafka消费者,主题: {}, 分区: {}", topic, partition);

try {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("主题不能为空");
}

String consumerKey = buildConsumerKey(topic, partition);

// 检查消费者是否已存在
if (consumers.containsKey(consumerKey)) {
log.warn("消费者已存在,主题: {}, 分区: {}", topic, partition);
return;
}

// 创建消费者
KafkaConsumer<String, Object> consumer = createConsumer(topic, partition);

// 启动消费者线程
Thread consumerThread = new Thread(() -> {
try {
consumeMessages(consumer, topic, partition);
} catch (Exception e) {
log.error("消费者线程异常,主题: {}, 分区: {}", topic, partition, e);
} finally {
consumer.close();
consumers.remove(consumerKey);
}
});

consumerThread.setName("kafka-consumer-" + topic + "-" + partition);
consumerThread.start();

consumers.put(consumerKey, consumer);

log.info("Kafka消费者启动成功,主题: {}, 分区: {}", topic, partition);

} catch (Exception e) {
log.error("启动Kafka消费者失败,主题: {}, 分区: {}", topic, partition, e);
throw new RuntimeException("启动消费者失败: " + e.getMessage());
}
}

/**
* 停止消费者
* @param topic 主题
* @param partition 分区
*/
public void stopConsumer(String topic, Integer partition) {
log.info("停止Kafka消费者,主题: {}, 分区: {}", topic, partition);

try {
String consumerKey = buildConsumerKey(topic, partition);

KafkaConsumer<String, Object> consumer = consumers.get(consumerKey);
if (consumer != null) {
consumer.close();
consumers.remove(consumerKey);

log.info("Kafka消费者停止成功,主题: {}, 分区: {}", topic, partition);
} else {
log.warn("消费者不存在,主题: {}, 分区: {}", topic, partition);
}

} catch (Exception e) {
log.error("停止Kafka消费者失败,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 消费消息
* @param consumer 消费者
* @param topic 主题
* @param partition 分区
*/
private void consumeMessages(KafkaConsumer<String, Object> consumer, String topic, Integer partition) {
log.info("开始消费消息,主题: {}, 分区: {}", topic, partition);

try {
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(1000));

if (records.isEmpty()) {
continue;
}

// 处理消息
processMessages(records, topic, partition);

// 提交偏移量
if (properties.isEnableAutoCommit()) {
consumer.commitSync();
}

// 监控消息积压
backlogMonitorService.recordConsumption(topic, partition, records.count());

}

} catch (Exception e) {
log.error("消费消息异常,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 处理消息
* @param records 消息记录
* @param topic 主题
* @param partition 分区
*/
private void processMessages(ConsumerRecords<String, Object> records, String topic, Integer partition) {
log.info("处理消息,主题: {}, 分区: {}, 数量: {}", topic, partition, records.count());

try {
for (ConsumerRecord<String, Object> record : records) {
try {
// 处理单条消息
messageProcessingService.processMessage(record, topic, partition);

log.debug("消息处理成功,主题: {}, 分区: {}, 偏移量: {}",
topic, partition, record.offset());

} catch (Exception e) {
log.error("处理单条消息异常,主题: {}, 分区: {}, 偏移量: {}",
topic, partition, record.offset(), e);

// 记录处理失败的消息
backlogMonitorService.recordProcessingFailure(topic, partition, record.offset());
}
}

} catch (Exception e) {
log.error("处理消息异常,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 创建消费者
* @param topic 主题
* @param partition 分区
* @return 消费者
*/
private KafkaConsumer<String, Object> createConsumer(String topic, Integer partition) {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// 性能优化配置
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, properties.getFetchMinBytes());
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, properties.getFetchMaxWaitMs());
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getMaxPollRecords());
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getMaxPollIntervalMs());

// 偏移量配置
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.isEnableAutoCommit());
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutoCommitIntervalMs());

// 会话配置
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getSessionTimeoutMs());
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getHeartbeatIntervalMs());

// 分区配置
configProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, properties.getPartitionAssignmentStrategy());

return new KafkaConsumer<>(configProps);
}

/**
* 构建消费者键
* @param topic 主题
* @param partition 分区
* @return 消费者键
*/
private String buildConsumerKey(String topic, Integer partition) {
return topic + "-" + (partition != null ? partition : "all");
}

/**
* 获取消费者状态
* @return 消费者状态
*/
public Map<String, Object> getConsumerStatus() {
Map<String, Object> status = new HashMap<>();

status.put("totalConsumers", consumers.size());
status.put("consumers", consumers.keySet());
status.put("lastUpdateTime", LocalDateTime.now());

return status;
}
}

3. Kafka生产者优化

3.1 Kafka生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Kafka生产者配置
* @author Java实战
*/
@Configuration
@EnableConfigurationProperties(KafkaProducerProperties.class)
public class KafkaProducerConfig {

@Autowired
private KafkaProducerProperties properties;

/**
* Kafka生产者工厂
* @return 生产者工厂
*/
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// 性能优化配置
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getBatchSize());
configProps.put(ProducerConfig.LINGER_MS_CONFIG, properties.getLingerMs());
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, properties.getBufferMemory());
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType());

// 可靠性配置
configProps.put(ProducerConfig.ACKS_CONFIG, properties.getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG, properties.getRetries());
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, properties.getRetryBackoffMs());
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getRequestTimeoutMs());

// 分区配置
configProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, properties.getPartitionerClass());

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* Kafka模板
* @return Kafka模板
*/
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

private static final Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class);
}

3.2 Kafka生产者属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Kafka生产者属性配置
* @author Java实战
*/
@Data
@ConfigurationProperties(prefix = "kafka.producer")
public class KafkaProducerProperties {

/**
* Kafka服务器地址
*/
private String bootstrapServers = "localhost:9092";

/**
* 批量大小
*/
private int batchSize = 16384;

/**
* 延迟时间(毫秒)
*/
private int lingerMs = 0;

/**
* 缓冲区内存大小
*/
private long bufferMemory = 33554432;

/**
* 压缩类型
*/
private String compressionType = "none";

/**
* 确认模式
*/
private String acks = "1";

/**
* 重试次数
*/
private int retries = 3;

/**
* 重试退避时间(毫秒)
*/
private int retryBackoffMs = 100;

/**
* 请求超时时间(毫秒)
*/
private int requestTimeoutMs = 30000;

/**
* 分区器类
*/
private String partitionerClass = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";

/**
* 是否启用生产者
*/
private boolean enable = true;

/**
* 生产者线程数
*/
private int threadCount = 1;

/**
* 是否启用压缩
*/
private boolean enableCompression = false;

/**
* 压缩阈值(字节)
*/
private int compressionThreshold = 1024;
}

3.3 Kafka生产者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/**
* Kafka生产者服务
* @author Java实战
*/
@Service
@Slf4j
public class KafkaProducerService {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private KafkaProducerProperties properties;

@Autowired
private KafkaBacklogMonitorService backlogMonitorService;

/**
* 发送消息
* @param topic 主题
* @param key 键
* @param value 值
* @return 是否成功
*/
public boolean sendMessage(String topic, String key, Object value) {
log.info("发送消息,主题: {}, 键: {}", topic, key);

try {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("主题不能为空");
}

if (value == null) {
throw new IllegalArgumentException("消息值不能为空");
}

// 发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, key, value);

// 添加回调
future.addCallback(
result -> {
log.info("消息发送成功,主题: {}, 键: {}, 分区: {}, 偏移量: {}",
topic, key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());

// 记录发送成功
backlogMonitorService.recordProduction(topic, result.getRecordMetadata().partition());
},
failure -> {
log.error("消息发送失败,主题: {}, 键: {}", topic, key, failure);

// 记录发送失败
backlogMonitorService.recordProductionFailure(topic, key);
}
);

return true;

} catch (Exception e) {
log.error("发送消息异常,主题: {}, 键: {}", topic, key, e);
return false;
}
}

/**
* 发送消息(指定分区)
* @param topic 主题
* @param partition 分区
* @param key 键
* @param value 值
* @return 是否成功
*/
public boolean sendMessage(String topic, Integer partition, String key, Object value) {
log.info("发送消息,主题: {}, 分区: {}, 键: {}", topic, partition, key);

try {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("主题不能为空");
}

if (value == null) {
throw new IllegalArgumentException("消息值不能为空");
}

// 发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, partition, key, value);

// 添加回调
future.addCallback(
result -> {
log.info("消息发送成功,主题: {}, 分区: {}, 键: {}, 偏移量: {}",
topic, partition, key, result.getRecordMetadata().offset());

// 记录发送成功
backlogMonitorService.recordProduction(topic, partition);
},
failure -> {
log.error("消息发送失败,主题: {}, 分区: {}, 键: {}", topic, partition, key, failure);

// 记录发送失败
backlogMonitorService.recordProductionFailure(topic, key);
}
);

return true;

} catch (Exception e) {
log.error("发送消息异常,主题: {}, 分区: {}, 键: {}", topic, partition, key, e);
return false;
}
}

/**
* 批量发送消息
* @param topic 主题
* @param messages 消息列表
* @return 发送成功数量
*/
public int sendBatchMessages(String topic, List<MessageData> messages) {
log.info("批量发送消息,主题: {}, 数量: {}", topic, messages.size());

try {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("主题不能为空");
}

if (messages == null || messages.isEmpty()) {
throw new IllegalArgumentException("消息列表不能为空");
}

int successCount = 0;

for (MessageData message : messages) {
try {
boolean success = sendMessage(topic, message.getKey(), message.getValue());
if (success) {
successCount++;
}
} catch (Exception e) {
log.error("发送单条消息异常,主题: {}, 键: {}", topic, message.getKey(), e);
}
}

log.info("批量发送消息完成,主题: {}, 成功数量: {}, 总数量: {}", topic, successCount, messages.size());

return successCount;

} catch (Exception e) {
log.error("批量发送消息异常,主题: {}", topic, e);
return 0;
}
}

/**
* 异步发送消息
* @param topic 主题
* @param key 键
* @param value 值
* @param callback 回调
*/
public void sendMessageAsync(String topic, String key, Object value, SendCallback callback) {
log.info("异步发送消息,主题: {}, 键: {}", topic, key);

try {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("主题不能为空");
}

if (value == null) {
throw new IllegalArgumentException("消息值不能为空");
}

// 异步发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, key, value);

// 添加回调
future.addCallback(
result -> {
log.info("异步消息发送成功,主题: {}, 键: {}, 分区: {}, 偏移量: {}",
topic, key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());

// 记录发送成功
backlogMonitorService.recordProduction(topic, result.getRecordMetadata().partition());

if (callback != null) {
callback.onSuccess(result);
}
},
failure -> {
log.error("异步消息发送失败,主题: {}, 键: {}", topic, key, failure);

// 记录发送失败
backlogMonitorService.recordProductionFailure(topic, key);

if (callback != null) {
callback.onFailure(failure);
}
}
);

} catch (Exception e) {
log.error("异步发送消息异常,主题: {}, 键: {}", topic, key, e);

if (callback != null) {
callback.onFailure(e);
}
}
}

/**
* 获取生产者状态
* @return 生产者状态
*/
public Map<String, Object> getProducerStatus() {
Map<String, Object> status = new HashMap<>();

status.put("enable", properties.isEnable());
status.put("threadCount", properties.getThreadCount());
status.put("batchSize", properties.getBatchSize());
status.put("lingerMs", properties.getLingerMs());
status.put("compressionType", properties.getCompressionType());
status.put("lastUpdateTime", LocalDateTime.now());

return status;
}
}

4. Kafka消息积压监控

4.1 Kafka消息积压监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/**
* Kafka消息积压监控服务
* @author Java实战
*/
@Service
@Slf4j
public class KafkaBacklogMonitorService {

@Autowired
private KafkaConsumerProperties consumerProperties;

@Autowired
private KafkaProducerProperties producerProperties;

private final Map<String, BacklogMetrics> backlogMetrics = new ConcurrentHashMap<>();

/**
* 记录消息生产
* @param topic 主题
* @param partition 分区
*/
public void recordProduction(String topic, Integer partition) {
log.debug("记录消息生产,主题: {}, 分区: {}", topic, partition);

try {
String key = buildKey(topic, partition);
BacklogMetrics metrics = backlogMetrics.computeIfAbsent(key, k -> new BacklogMetrics(topic, partition));

metrics.incrementProductionCount();
metrics.setLastProductionTime(LocalDateTime.now());

} catch (Exception e) {
log.error("记录消息生产异常,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 记录消息生产失败
* @param topic 主题
* @param key 键
*/
public void recordProductionFailure(String topic, String key) {
log.debug("记录消息生产失败,主题: {}, 键: {}", topic, key);

try {
// 这里可以添加生产失败的处理逻辑
log.warn("消息生产失败,主题: {}, 键: {}", topic, key);

} catch (Exception e) {
log.error("记录消息生产失败异常,主题: {}, 键: {}", topic, key, e);
}
}

/**
* 记录消息消费
* @param topic 主题
* @param partition 分区
* @param count 消费数量
*/
public void recordConsumption(String topic, Integer partition, int count) {
log.debug("记录消息消费,主题: {}, 分区: {}, 数量: {}", topic, partition, count);

try {
String key = buildKey(topic, partition);
BacklogMetrics metrics = backlogMetrics.computeIfAbsent(key, k -> new BacklogMetrics(topic, partition));

metrics.incrementConsumptionCount(count);
metrics.setLastConsumptionTime(LocalDateTime.now());

} catch (Exception e) {
log.error("记录消息消费异常,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 记录消息处理失败
* @param topic 主题
* @param partition 分区
* @param offset 偏移量
*/
public void recordProcessingFailure(String topic, Integer partition, long offset) {
log.debug("记录消息处理失败,主题: {}, 分区: {}, 偏移量: {}", topic, partition, offset);

try {
String key = buildKey(topic, partition);
BacklogMetrics metrics = backlogMetrics.computeIfAbsent(key, k -> new BacklogMetrics(topic, partition));

metrics.incrementFailureCount();
metrics.setLastFailureTime(LocalDateTime.now());

} catch (Exception e) {
log.error("记录消息处理失败异常,主题: {}, 分区: {}", topic, partition, e);
}
}

/**
* 获取消息积压状态
* @param topic 主题
* @param partition 分区
* @return 积压状态
*/
public BacklogStatus getBacklogStatus(String topic, Integer partition) {
log.info("获取消息积压状态,主题: {}, 分区: {}", topic, partition);

try {
String key = buildKey(topic, partition);
BacklogMetrics metrics = backlogMetrics.get(key);

if (metrics == null) {
return new BacklogStatus(topic, partition, 0, 0, 0, "UNKNOWN");
}

BacklogStatus status = new BacklogStatus();
status.setTopic(topic);
status.setPartition(partition);
status.setProductionCount(metrics.getProductionCount());
status.setConsumptionCount(metrics.getConsumptionCount());
status.setFailureCount(metrics.getFailureCount());

// 计算积压数量
long backlogCount = metrics.getProductionCount() - metrics.getConsumptionCount();
status.setBacklogCount(backlogCount);

// 判断积压状态
if (backlogCount > 10000) {
status.setStatus("CRITICAL");
} else if (backlogCount > 1000) {
status.setStatus("WARNING");
} else if (backlogCount > 0) {
status.setStatus("NORMAL");
} else {
status.setStatus("HEALTHY");
}

status.setLastUpdateTime(LocalDateTime.now());

log.info("获取消息积压状态成功,主题: {}, 分区: {}, 积压数量: {}, 状态: {}",
topic, partition, backlogCount, status.getStatus());

return status;

} catch (Exception e) {
log.error("获取消息积压状态异常,主题: {}, 分区: {}", topic, partition, e);
return null;
}
}

/**
* 获取所有消息积压状态
* @return 积压状态列表
*/
public List<BacklogStatus> getAllBacklogStatus() {
log.info("获取所有消息积压状态");

try {
List<BacklogStatus> statusList = new ArrayList<>();

for (BacklogMetrics metrics : backlogMetrics.values()) {
BacklogStatus status = getBacklogStatus(metrics.getTopic(), metrics.getPartition());
if (status != null) {
statusList.add(status);
}
}

log.info("获取所有消息积压状态成功,数量: {}", statusList.size());

return statusList;

} catch (Exception e) {
log.error("获取所有消息积压状态异常", e);
return new ArrayList<>();
}
}

/**
* 检查消息积压告警
* @return 告警列表
*/
public List<BacklogAlert> checkBacklogAlerts() {
log.info("检查消息积压告警");

try {
List<BacklogAlert> alerts = new ArrayList<>();

for (BacklogMetrics metrics : backlogMetrics.values()) {
BacklogStatus status = getBacklogStatus(metrics.getTopic(), metrics.getPartition());

if (status != null && ("CRITICAL".equals(status.getStatus()) || "WARNING".equals(status.getStatus()))) {
BacklogAlert alert = new BacklogAlert();
alert.setTopic(status.getTopic());
alert.setPartition(status.getPartition());
alert.setBacklogCount(status.getBacklogCount());
alert.setStatus(status.getStatus());
alert.setAlertTime(LocalDateTime.now());
alert.setMessage("消息积压告警: " + status.getStatus());

alerts.add(alert);
}
}

log.info("检查消息积压告警完成,告警数量: {}", alerts.size());

return alerts;

} catch (Exception e) {
log.error("检查消息积压告警异常", e);
return new ArrayList<>();
}
}

/**
* 构建键
* @param topic 主题
* @param partition 分区
* @return
*/
private String buildKey(String topic, Integer partition) {
return topic + "-" + (partition != null ? partition : "all");
}
}

4.2 Kafka消息积压监控控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* Kafka消息积压监控控制器
* @author Java实战
*/
@RestController
@RequestMapping("/api/kafka/backlog")
@Slf4j
public class KafkaBacklogMonitorController {

@Autowired
private KafkaBacklogMonitorService backlogMonitorService;

@Autowired
private KafkaConsumerService consumerService;

@Autowired
private KafkaProducerService producerService;

/**
* 获取消息积压状态
* @param topic 主题
* @param partition 分区
* @return 积压状态
*/
@GetMapping("/status")
public ResponseEntity<BacklogStatus> getBacklogStatus(
@RequestParam String topic,
@RequestParam(required = false) Integer partition) {
try {
log.info("接收到获取消息积压状态请求,主题: {}, 分区: {}", topic, partition);

BacklogStatus status = backlogMonitorService.getBacklogStatus(topic, partition);

if (status != null) {
return ResponseEntity.ok(status);
} else {
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}

} catch (Exception e) {
log.error("获取消息积压状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取所有消息积压状态
* @return 积压状态列表
*/
@GetMapping("/status/all")
public ResponseEntity<List<BacklogStatus>> getAllBacklogStatus() {
try {
log.info("接收到获取所有消息积压状态请求");

List<BacklogStatus> statusList = backlogMonitorService.getAllBacklogStatus();

return ResponseEntity.ok(statusList);

} catch (Exception e) {
log.error("获取所有消息积压状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 检查消息积压告警
* @return 告警列表
*/
@GetMapping("/alerts")
public ResponseEntity<List<BacklogAlert>> checkBacklogAlerts() {
try {
log.info("接收到检查消息积压告警请求");

List<BacklogAlert> alerts = backlogMonitorService.checkBacklogAlerts();

return ResponseEntity.ok(alerts);

} catch (Exception e) {
log.error("检查消息积压告警失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取消费者状态
* @return 消费者状态
*/
@GetMapping("/consumer/status")
public ResponseEntity<Map<String, Object>> getConsumerStatus() {
try {
log.info("接收到获取消费者状态请求");

Map<String, Object> status = consumerService.getConsumerStatus();

return ResponseEntity.ok(status);

} catch (Exception e) {
log.error("获取消费者状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取生产者状态
* @return 生产者状态
*/
@GetMapping("/producer/status")
public ResponseEntity<Map<String, Object>> getProducerStatus() {
try {
log.info("接收到获取生产者状态请求");

Map<String, Object> status = producerService.getProducerStatus();

return ResponseEntity.ok(status);

} catch (Exception e) {
log.error("获取生产者状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 健康检查
* @return 健康状态
*/
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> healthCheck() {
try {
log.info("接收到Kafka消息积压健康检查请求");

Map<String, Object> health = new HashMap<>();

// 检查消息积压状态
List<BacklogStatus> statusList = backlogMonitorService.getAllBacklogStatus();
boolean hasCriticalBacklog = statusList.stream()
.anyMatch(status -> "CRITICAL".equals(status.getStatus()));

health.put("status", hasCriticalBacklog ? "DOWN" : "UP");
health.put("timestamp", System.currentTimeMillis());
health.put("backlogStatus", statusList);
health.put("consumerStatus", consumerService.getConsumerStatus());
health.put("producerStatus", producerService.getProducerStatus());

return ResponseEntity.ok(health);

} catch (Exception e) {
log.error("Kafka消息积压健康检查失败", e);

Map<String, Object> health = new HashMap<>();
health.put("status", "DOWN");
health.put("timestamp", System.currentTimeMillis());
health.put("error", e.getMessage());

return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(health);
}
}
}

5. 总结

5.1 Kafka消息积压处理总结

  1. 消费者优化: 提高消费者处理能力和并发度
  2. 生产者调优: 优化生产者发送策略和批量处理
  3. 分区策略: 合理分配分区和负载均衡
  4. 监控告警: 实时监控消息积压和性能指标
  5. 应急处理: 消息积压应急处理和恢复策略
  6. 资源扩容: 根据积压情况动态扩容资源

5.2 消息积压处理策略

  • 消费者优化: 提高消费者处理能力
  • 生产者调优: 优化生产者发送策略
  • 分区策略: 合理分配分区
  • 资源扩容: 增加服务器资源
  • 监控告警: 实时监控消息积压
  • 应急处理: 消息积压应急处理

5.3 最佳实践建议

  • 合理配置参数: 根据业务场景合理配置Kafka参数
  • 监控消息积压: 实时监控消息积压情况
  • 优化消费逻辑: 优化消息消费处理逻辑
  • 批量处理: 使用批量处理提高效率
  • 错误处理: 完善错误处理和重试机制
  • 资源管理: 合理管理服务器资源

通过本文的Kafka消息积压处理运维实战指南,您可以掌握Kafka消息积压的原因、处理策略、监控管理以及在企业级应用中的最佳实践,构建高效、稳定、可扩展的Kafka消息系统!