1. Kafka承接车辆高频GPS数据概述

Kafka承接车辆高频GPS数据是智能交通系统的核心技术,通过Kafka的高吞吐量、低延迟、水平扩展和消息持久化特性,可以实现大规模车辆GPS数据的实时处理。系统具备高频数据接收、消息持久化、水平扩展、负载均衡、监控告警等功能。本文将详细介绍Kafka承接车辆高频GPS数据的原理、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 Kafka承接车辆高频GPS数据核心价值

  1. 高频数据处理: 支持大规模车辆GPS数据的实时处理
  2. 水平扩展: 支持集群水平扩展提升处理能力
  3. 消息持久化: 保证消息不丢失和可恢复
  4. 高吞吐量: 支持每秒百万级消息处理
  5. 低延迟: 毫秒级消息处理延迟

1.2 Kafka承接车辆高频GPS数据场景

  • 车辆实时定位: 车辆GPS数据的实时接收和分发
  • 轨迹数据流: 车辆轨迹数据的流式处理
  • 异常检测: 车辆异常行为的实时检测
  • 数据同步: 多系统间的GPS数据同步
  • 历史数据回放: 历史GPS数据的回放和分析

1.3 Kafka技术特性

  • 分布式架构: 支持多节点集群部署
  • 分区机制: 通过分区实现并行处理
  • 副本机制: 通过副本保证数据可靠性
  • 消息持久化: 消息持久化到磁盘
  • 水平扩展: 支持动态添加节点

2. Kafka承接车辆高频GPS数据基础实现

2.1 Kafka GPS数据处理配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Kafka GPS数据处理配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(KafkaGPSDataProperties.class)
public class KafkaGPSDataConfig {

@Autowired
private KafkaGPSDataProperties properties;

/**
* Kafka GPS数据生产者
* @return Kafka生产者
*/
@Bean
public ProducerFactory<String, GPSData> gpsDataProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, properties.getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG, properties.getRetries());
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getBatchSize());
configProps.put(ProducerConfig.LINGER_MS_CONFIG, properties.getLingerMs());
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, properties.getBufferMemory());
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType());
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, properties.getMaxInFlightRequests());
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, properties.isEnableIdempotence());

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* Kafka GPS数据生产者模板
* @return Kafka生产者模板
*/
@Bean
public KafkaTemplate<String, GPSData> gpsDataKafkaTemplate() {
return new KafkaTemplate<>(gpsDataProducerFactory());
}

/**
* Kafka GPS数据消费者工厂
* @return Kafka消费者工厂
*/
@Bean
public ConsumerFactory<String, GPSData> gpsDataConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumerGroupId());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.isEnableAutoCommit());
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutoCommitIntervalMs());
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getSessionTimeoutMs());
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getMaxPollRecords());
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, properties.getFetchMinBytes());
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, properties.getFetchMaxWaitMs());

return new DefaultKafkaConsumerFactory<>(configProps);
}

/**
* Kafka GPS数据消费者容器工厂
* @return Kafka消费者容器工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GPSData> gpsDataKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GPSData> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(gpsDataConsumerFactory());
factory.setConcurrency(properties.getConsumerConcurrency());
factory.setBatchListener(properties.isEnableBatchListener());
factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
factory.setErrorHandler(new SeekToCurrentErrorHandler());

return factory;
}

/**
* Kafka GPS数据处理服务
* @return Kafka GPS数据处理服务
*/
@Bean
public KafkaGPSDataProcessingService kafkaGPSDataProcessingService() {
return new KafkaGPSDataProcessingService();
}

/**
* Kafka GPS数据监控服务
* @return Kafka GPS数据监控服务
*/
@Bean
public KafkaGPSDataMonitorService kafkaGPSDataMonitorService() {
return new KafkaGPSDataMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataConfig.class);
}

2.2 Kafka GPS数据属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/**
* Kafka GPS数据属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "kafka.gps.data")
public class KafkaGPSDataProperties {

/**
* Kafka集群地址
*/
private String bootstrapServers = "localhost:9092";

/**
* GPS数据主题名称
*/
private String topicName = "gps-data-topic";

/**
* GPS数据主题分区数
*/
private int topicPartitions = 12;

/**
* GPS数据主题副本数
*/
private int topicReplicationFactor = 3;

/**
* 消费者组ID
*/
private String consumerGroupId = "gps-data-consumer-group";

/**
* 生产者配置
*/
private ProducerProperties producer = new ProducerProperties();

/**
* 消费者配置
*/
private ConsumerProperties consumer = new ConsumerProperties();

/**
* 是否启用Kafka GPS数据处理
*/
private boolean enableKafkaGPSDataProcessing = true;

/**
* 是否启用消息持久化
*/
private boolean enableMessagePersistence = true;

/**
* 消息保留时间(小时)
*/
private int messageRetentionHours = 168; // 7天

/**
* 是否启用水平扩展
*/
private boolean enableHorizontalScaling = true;

/**
* 是否启用监控
*/
private boolean enableMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用告警
*/
private boolean enableAlert = true;

/**
* 消息积压告警阈值
*/
private int messageBacklogAlertThreshold = 10000;
}

/**
* 生产者属性配置
* @author 运维实战
*/
@Data
public class ProducerProperties {

/**
* 确认机制
*/
private String acks = "all";

/**
* 重试次数
*/
private int retries = 3;

/**
* 批次大小
*/
private int batchSize = 16384;

/**
* 延迟时间(毫秒)
*/
private int lingerMs = 5;

/**
* 缓冲区大小
*/
private long bufferMemory = 33554432L;

/**
* 压缩类型
*/
private String compressionType = "snappy";

/**
* 最大在途请求数
*/
private int maxInFlightRequests = 5;

/**
* 是否启用幂等性
*/
private boolean enableIdempotence = true;
}

/**
* 消费者属性配置
* @author 运维实战
*/
@Data
public class ConsumerProperties {

/**
* 自动偏移量重置策略
*/
private String autoOffsetReset = "latest";

/**
* 是否启用自动提交
*/
private boolean enableAutoCommit = false;

/**
* 自动提交间隔(毫秒)
*/
private int autoCommitIntervalMs = 1000;

/**
* 会话超时时间(毫秒)
*/
private int sessionTimeoutMs = 30000;

/**
* 最大拉取记录数
*/
private int maxPollRecords = 500;

/**
* 最小拉取字节数
*/
private int fetchMinBytes = 1;

/**
* 最大拉取等待时间(毫秒)
*/
private int fetchMaxWaitMs = 500;

/**
* 消费者并发数
*/
private int concurrency = 3;

/**
* 是否启用批量监听
*/
private boolean enableBatchListener = true;
}

2.3 GPS数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* GPS数据模型类
* @author 运维实战
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class GPSData {

/**
* 消息ID
*/
private String messageId;

/**
* 车辆ID
*/
private String vehicleId;

/**
* 经度
*/
private Double longitude;

/**
* 纬度
*/
private Double latitude;

/**
* 海拔高度
*/
private Double altitude;

/**
* 速度(km/h)
*/
private Double speed;

/**
* 方向角
*/
private Double direction;

/**
* GPS时间戳
*/
private Long gpsTimestamp;

/**
* 接收时间戳
*/
private Long receiveTimestamp;

/**
* 数据质量
*/
private Integer dataQuality;

/**
* 卫星数量
*/
private Integer satelliteCount;

/**
* 是否有效
*/
private Boolean isValid;

/**
* 分区键
*/
private String partitionKey;

/**
* 消息序号
*/
private Long messageSequence;

public GPSData() {
this.messageId = UUID.randomUUID().toString();
this.receiveTimestamp = System.currentTimeMillis();
this.isValid = true;
this.partitionKey = "default";
this.messageSequence = 0L;
}

/**
* 生成分区键
* @return 分区键
*/
public String generatePartitionKey() {
if (vehicleId != null) {
return vehicleId;
}
return "default";
}

/**
* 验证GPS数据
* @return 是否有效
*/
public boolean validate() {
if (longitude == null || latitude == null) {
return false;
}

if (longitude < -180 || longitude > 180) {
return false;
}

if (latitude < -90 || latitude > 90) {
return false;
}

if (speed != null && (speed < 0 || speed > 300)) {
return false;
}

if (direction != null && (direction < 0 || direction > 360)) {
return false;
}

return true;
}

/**
* 计算数据质量分数
* @return 数据质量分数
*/
public int calculateDataQuality() {
int quality = 100;

// 根据卫星数量调整质量
if (satelliteCount != null) {
if (satelliteCount < 4) {
quality -= 30;
} else if (satelliteCount < 6) {
quality -= 15;
} else if (satelliteCount < 8) {
quality -= 5;
}
}

// 根据速度合理性调整质量
if (speed != null) {
if (speed > 150) {
quality -= 20;
} else if (speed > 120) {
quality -= 10;
}
}

// 根据GPS时间戳调整质量
if (gpsTimestamp != null) {
long timeDiff = Math.abs(System.currentTimeMillis() - gpsTimestamp);
if (timeDiff > 300000) { // 5分钟
quality -= 25;
} else if (timeDiff > 60000) { // 1分钟
quality -= 10;
}
}

return Math.max(0, quality);
}
}

2.4 基础Kafka GPS数据处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
/**
* 基础Kafka GPS数据处理服务
* @author 运维实战
*/
@Service
public class KafkaGPSDataProcessingService {

@Autowired
private KafkaGPSDataProperties properties;

@Autowired
private KafkaTemplate<String, GPSData> gpsDataKafkaTemplate;

@Autowired
private KafkaGPSDataMonitorService kafkaGPSDataMonitorService;

@Autowired
private KafkaAdmin kafkaAdmin;

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataProcessingService.class);

/**
* 发送GPS数据到Kafka
* @param gpsData GPS数据
* @return 发送结果
*/
public KafkaGPSDataSendResult sendGPSData(GPSData gpsData) {
logger.info("发送GPS数据到Kafka,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());

KafkaGPSDataSendResult result = new KafkaGPSDataSendResult();
result.setMessageId(gpsData.getMessageId());
result.setVehicleId(gpsData.getVehicleId());
result.setStartTime(System.currentTimeMillis());

try {
// 验证GPS数据
if (!gpsData.validate()) {
result.setSuccess(false);
result.setError("GPS数据验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 计算数据质量
gpsData.setDataQuality(gpsData.calculateDataQuality());

// 生成分区键
String partitionKey = gpsData.generatePartitionKey();
gpsData.setPartitionKey(partitionKey);

// 发送到Kafka
ListenableFuture<SendResult<String, GPSData>> future = gpsDataKafkaTemplate.send(
properties.getTopicName(), partitionKey, gpsData);

// 添加回调
future.addCallback(
result -> {
logger.debug("GPS数据发送成功,车辆ID: {}, 分区: {}, 偏移量: {}",
gpsData.getVehicleId(), result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
},
failure -> {
logger.error("GPS数据发送失败,车辆ID: {}", gpsData.getVehicleId(), failure);
}
);

// 等待发送完成
SendResult<String, GPSData> sendResult = future.get(5, TimeUnit.SECONDS);

result.setSuccess(true);
result.setPartition(sendResult.getRecordMetadata().partition());
result.setOffset(sendResult.getRecordMetadata().offset());
result.setEndTime(System.currentTimeMillis());

// 记录发送成功指标
kafkaGPSDataMonitorService.recordGPSDataSend(gpsData.getVehicleId(), true);

logger.info("GPS数据发送成功,车辆ID: {}, 分区: {}, 偏移量: {}, 耗时: {}ms",
gpsData.getVehicleId(), result.getPartition(), result.getOffset(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("GPS数据发送异常,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录发送失败指标
kafkaGPSDataMonitorService.recordGPSDataSend(gpsData.getVehicleId(), false);

return result;
}
}

/**
* 批量发送GPS数据到Kafka
* @param gpsDataList GPS数据列表
* @return 批量发送结果
*/
public KafkaGPSDataBatchSendResult batchSendGPSData(List<GPSData> gpsDataList) {
logger.info("批量发送GPS数据到Kafka,数量: {}", gpsDataList.size());

KafkaGPSDataBatchSendResult result = new KafkaGPSDataBatchSendResult();
result.setTotalCount(gpsDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 分批发送
List<List<GPSData>> batches = partitionList(gpsDataList, properties.getProducer().getBatchSize() / 100);

for (List<GPSData> batch : batches) {
// 并行发送批次
List<CompletableFuture<KafkaGPSDataSendResult>> futures = batch.stream()
.map(gpsData -> CompletableFuture.supplyAsync(() -> sendGPSData(gpsData)))
.collect(Collectors.toList());

// 等待批次完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

// 统计结果
for (CompletableFuture<KafkaGPSDataSendResult> future : futures) {
KafkaGPSDataSendResult sendResult = future.get();
if (sendResult.isSuccess()) {
result.incrementSuccessCount();
} else {
result.incrementFailureCount();
}
}
}

result.setEndTime(System.currentTimeMillis());

// 记录批量发送指标
kafkaGPSDataMonitorService.recordGPSDataBatchSend(gpsDataList.size(), result.getSuccessCount());

logger.info("批量GPS数据发送完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量GPS数据发送异常", e);
result.setSuccess(false);
result.setError("批量GPS数据发送异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 创建GPS数据主题
* @return 创建结果
*/
public KafkaTopicCreateResult createGPSDataTopic() {
logger.info("创建GPS数据主题,主题名: {}", properties.getTopicName());

KafkaTopicCreateResult result = new KafkaTopicCreateResult();
result.setTopicName(properties.getTopicName());
result.setStartTime(System.currentTimeMillis());

try {
// 检查主题是否已存在
if (topicExists(properties.getTopicName())) {
result.setSuccess(true);
result.setMessage("主题已存在");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 创建主题
NewTopic newTopic = new NewTopic(properties.getTopicName(),
properties.getTopicPartitions(), (short) properties.getTopicReplicationFactor());

CreateTopicsResult createTopicsResult = kafkaAdmin.createTopics(Collections.singletonList(newTopic));

// 等待创建完成
createTopicsResult.all().get(30, TimeUnit.SECONDS);

result.setSuccess(true);
result.setPartitions(properties.getTopicPartitions());
result.setReplicationFactor(properties.getTopicReplicationFactor());
result.setEndTime(System.currentTimeMillis());

logger.info("GPS数据主题创建成功,主题名: {}, 分区数: {}, 副本数: {}",
properties.getTopicName(), properties.getTopicPartitions(), properties.getTopicReplicationFactor());

return result;

} catch (Exception e) {
logger.error("GPS数据主题创建异常,主题名: {}", properties.getTopicName(), e);
result.setSuccess(false);
result.setError("GPS数据主题创建异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 检查主题是否存在
* @param topicName 主题名
* @return 是否存在
*/
private boolean topicExists(String topicName) {
try {
ListTopicsResult listTopicsResult = kafkaAdmin.listTopics();
Set<String> topicNames = listTopicsResult.names().get(10, TimeUnit.SECONDS);
return topicNames.contains(topicName);
} catch (Exception e) {
logger.error("检查主题是否存在异常,主题名: {}", topicName, e);
return false;
}
}

/**
* 获取主题信息
* @param topicName 主题名
* @return 主题信息
*/
public KafkaTopicInfo getTopicInfo(String topicName) {
try {
DescribeTopicsResult describeTopicsResult = kafkaAdmin.describeTopics(Collections.singletonList(topicName));
Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get(10, TimeUnit.SECONDS);

TopicDescription topicDescription = topicDescriptions.get(topicName);
if (topicDescription != null) {
KafkaTopicInfo topicInfo = new KafkaTopicInfo();
topicInfo.setTopicName(topicName);
topicInfo.setPartitions(topicDescription.partitions().size());
topicInfo.setReplicationFactor(topicDescription.partitions().get(0).replicas().size());
topicInfo.setInternal(topicDescription.isInternal());
return topicInfo;
}

return null;

} catch (Exception e) {
logger.error("获取主题信息异常,主题名: {}", topicName, e);
return null;
}
}

/**
* 分割列表
* @param list 原列表
* @param size 分割大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}

2.5 Kafka GPS数据处理结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* Kafka GPS数据发送结果类
* @author 运维实战
*/
@Data
public class KafkaGPSDataSendResult {

private boolean success;
private String messageId;
private String vehicleId;
private int partition;
private long offset;
private String error;
private long startTime;
private long endTime;

public KafkaGPSDataSendResult() {
this.success = false;
this.partition = -1;
this.offset = -1;
}

/**
* 获取发送耗时
* @return 发送耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Kafka GPS数据批量发送结果类
* @author 运维实战
*/
@Data
public class KafkaGPSDataBatchSendResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public KafkaGPSDataBatchSendResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取发送耗时
* @return 发送耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 增加成功计数
*/
public void incrementSuccessCount() {
this.successCount++;
}

/**
* 增加失败计数
*/
public void incrementFailureCount() {
this.failureCount++;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Kafka主题创建结果类
* @author 运维实战
*/
@Data
public class KafkaTopicCreateResult {

private boolean success;
private String topicName;
private int partitions;
private int replicationFactor;
private String message;
private String error;
private long startTime;
private long endTime;

public KafkaTopicCreateResult() {
this.success = false;
}

/**
* 获取创建耗时
* @return 创建耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Kafka主题信息类
* @author 运维实战
*/
@Data
public class KafkaTopicInfo {

private String topicName;
private int partitions;
private int replicationFactor;
private boolean internal;

public KafkaTopicInfo() {}
}

3. 高级功能实现

3.1 Kafka GPS数据消费者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/**
* Kafka GPS数据消费者服务
* @author 运维实战
*/
@Service
public class KafkaGPSDataConsumerService {

@Autowired
private KafkaGPSDataProperties properties;

@Autowired
private KafkaGPSDataMonitorService kafkaGPSDataMonitorService;

@Autowired
private GPSDataStorageService gpsDataStorageService;

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataConsumerService.class);

/**
* 消费GPS数据
* @param gpsData GPS数据
* @param acknowledgment 确认机制
*/
@KafkaListener(topics = "${kafka.gps.data.topic-name}", groupId = "${kafka.gps.data.consumer-group-id}")
public void consumeGPSData(GPSData gpsData, Acknowledgment acknowledgment) {
logger.info("消费GPS数据,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());

try {
// 验证GPS数据
if (!gpsData.validate()) {
logger.warn("GPS数据验证失败,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());
acknowledgment.acknowledge();
return;
}

// 处理GPS数据
GPSDataProcessingResult processingResult = processGPSData(gpsData);

if (processingResult.isSuccess()) {
// 手动确认消息
acknowledgment.acknowledge();

// 记录消费成功指标
kafkaGPSDataMonitorService.recordGPSDataConsume(gpsData.getVehicleId(), true);

logger.info("GPS数据消费成功,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());
} else {
logger.error("GPS数据处理失败,车辆ID: {}, 消息ID: {}, 错误: {}",
gpsData.getVehicleId(), gpsData.getMessageId(), processingResult.getError());

// 记录消费失败指标
kafkaGPSDataMonitorService.recordGPSDataConsume(gpsData.getVehicleId(), false);

// 可以选择不确认消息,让消息重新消费
// acknowledgment.acknowledge();
}

} catch (Exception e) {
logger.error("GPS数据消费异常,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId(), e);

// 记录消费异常指标
kafkaGPSDataMonitorService.recordGPSDataConsume(gpsData.getVehicleId(), false);

// 异常时不确认消息,让消息重新消费
// acknowledgment.acknowledge();
}
}

/**
* 批量消费GPS数据
* @param gpsDataList GPS数据列表
* @param acknowledgment 确认机制
*/
@KafkaListener(topics = "${kafka.gps.data.topic-name}", groupId = "${kafka.gps.data.consumer-group-id}")
public void batchConsumeGPSData(List<GPSData> gpsDataList, Acknowledgment acknowledgment) {
logger.info("批量消费GPS数据,数量: {}", gpsDataList.size());

try {
List<GPSData> validDataList = new ArrayList<>();
List<GPSData> invalidDataList = new ArrayList<>();

// 验证GPS数据
for (GPSData gpsData : gpsDataList) {
if (gpsData.validate()) {
validDataList.add(gpsData);
} else {
invalidDataList.add(gpsData);
logger.warn("GPS数据验证失败,车辆ID: {}, 消息ID: {}",
gpsData.getVehicleId(), gpsData.getMessageId());
}
}

// 处理有效数据
if (!validDataList.isEmpty()) {
GPSDataBatchProcessingResult processingResult = batchProcessGPSData(validDataList);

if (processingResult.isSuccess()) {
// 手动确认消息
acknowledgment.acknowledge();

// 记录批量消费成功指标
kafkaGPSDataMonitorService.recordGPSDataBatchConsume(gpsDataList.size(), processingResult.getSuccessCount());

logger.info("批量GPS数据消费成功,总数: {}, 成功: {}, 失败: {}",
gpsDataList.size(), processingResult.getSuccessCount(), processingResult.getFailureCount());
} else {
logger.error("批量GPS数据处理失败,错误: {}", processingResult.getError());

// 记录批量消费失败指标
kafkaGPSDataMonitorService.recordGPSDataBatchConsume(gpsDataList.size(), 0);
}
} else {
// 没有有效数据,直接确认
acknowledgment.acknowledge();
logger.warn("批量GPS数据中没有有效数据,数量: {}", gpsDataList.size());
}

} catch (Exception e) {
logger.error("批量GPS数据消费异常", e);

// 记录批量消费异常指标
kafkaGPSDataMonitorService.recordGPSDataBatchConsume(gpsDataList.size(), 0);
}
}

/**
* 处理GPS数据
* @param gpsData GPS数据
* @return 处理结果
*/
private GPSDataProcessingResult processGPSData(GPSData gpsData) {
GPSDataProcessingResult result = new GPSDataProcessingResult();
result.setVehicleId(gpsData.getVehicleId());
result.setStartTime(System.currentTimeMillis());

try {
// 存储GPS数据
GPSDataStorageResult storageResult = gpsDataStorageService.storeGPSData(gpsData);

if (storageResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());
} else {
result.setSuccess(false);
result.setError(storageResult.getError());
result.setEndTime(System.currentTimeMillis());
}

return result;

} catch (Exception e) {
logger.error("GPS数据处理异常,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量处理GPS数据
* @param gpsDataList GPS数据列表
* @return 批量处理结果
*/
private GPSDataBatchProcessingResult batchProcessGPSData(List<GPSData> gpsDataList) {
GPSDataBatchProcessingResult result = new GPSDataBatchProcessingResult();
result.setTotalCount(gpsDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 批量存储GPS数据
GPSDataBatchStorageResult storageResult = gpsDataStorageService.batchStoreGPSData(gpsDataList);

if (storageResult.isSuccess()) {
result.setSuccess(true);
result.setSuccessCount(storageResult.getSuccessCount());
result.setFailureCount(storageResult.getFailureCount());
} else {
result.setSuccess(false);
result.setError(storageResult.getError());
}

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("批量GPS数据处理异常", e);
result.setSuccess(false);
result.setError("批量GPS数据处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

3.2 Kafka GPS数据监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* Kafka GPS数据监控服务
* @author 运维实战
*/
@Service
public class KafkaGPSDataMonitorService {

private final AtomicLong totalGPSDataSent = new AtomicLong(0);
private final AtomicLong totalGPSDataConsumed = new AtomicLong(0);
private final AtomicLong totalSuccessfulSends = new AtomicLong(0);
private final AtomicLong totalFailedSends = new AtomicLong(0);
private final AtomicLong totalSuccessfulConsumes = new AtomicLong(0);
private final AtomicLong totalFailedConsumes = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataMonitorService.class);

/**
* 记录GPS数据发送
* @param vehicleId 车辆ID
* @param success 是否成功
*/
public void recordGPSDataSend(String vehicleId, boolean success) {
totalGPSDataSent.incrementAndGet();

if (success) {
totalSuccessfulSends.incrementAndGet();
} else {
totalFailedSends.incrementAndGet();
}

logger.debug("记录GPS数据发送: 车辆ID={}, 成功={}", vehicleId, success);
}

/**
* 记录GPS数据消费
* @param vehicleId 车辆ID
* @param success 是否成功
*/
public void recordGPSDataConsume(String vehicleId, boolean success) {
totalGPSDataConsumed.incrementAndGet();

if (success) {
totalSuccessfulConsumes.incrementAndGet();
} else {
totalFailedConsumes.incrementAndGet();
}

logger.debug("记录GPS数据消费: 车辆ID={}, 成功={}", vehicleId, success);
}

/**
* 记录GPS数据批量发送
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordGPSDataBatchSend(int totalCount, int successCount) {
totalGPSDataSent.addAndGet(totalCount);
totalSuccessfulSends.addAndGet(successCount);
totalFailedSends.addAndGet(totalCount - successCount);

logger.debug("记录GPS数据批量发送: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 记录GPS数据批量消费
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordGPSDataBatchConsume(int totalCount, int successCount) {
totalGPSDataConsumed.addAndGet(totalCount);
totalSuccessfulConsumes.addAndGet(successCount);
totalFailedConsumes.addAndGet(totalCount - successCount);

logger.debug("记录GPS数据批量消费: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 获取监控指标
* @return 监控指标
*/
public KafkaGPSDataMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

KafkaGPSDataMetrics metrics = new KafkaGPSDataMetrics();
metrics.setTotalGPSDataSent(totalGPSDataSent.get());
metrics.setTotalGPSDataConsumed(totalGPSDataConsumed.get());
metrics.setTotalSuccessfulSends(totalSuccessfulSends.get());
metrics.setTotalFailedSends(totalFailedSends.get());
metrics.setTotalSuccessfulConsumes(totalSuccessfulConsumes.get());
metrics.setTotalFailedConsumes(totalFailedConsumes.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalGPSDataSent.set(0);
totalGPSDataConsumed.set(0);
totalSuccessfulSends.set(0);
totalFailedSends.set(0);
totalSuccessfulConsumes.set(0);
totalFailedConsumes.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("Kafka GPS数据监控指标重置");
}

/**
* 定期监控Kafka GPS数据处理状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorKafkaGPSDataProcessingStatus() {
try {
KafkaGPSDataMetrics metrics = getMetrics();

logger.info("Kafka GPS数据处理监控: 发送={}, 消费={}, 发送成功={}, 发送失败={}, 消费成功={}, 消费失败={}, 发送成功率={}%, 消费成功率={}%",
metrics.getTotalGPSDataSent(), metrics.getTotalGPSDataConsumed(),
metrics.getTotalSuccessfulSends(), metrics.getTotalFailedSends(),
metrics.getTotalSuccessfulConsumes(), metrics.getTotalFailedConsumes(),
String.format("%.2f", metrics.getSendSuccessRate()),
String.format("%.2f", metrics.getConsumeSuccessRate()));

// 检查异常情况
if (metrics.getSendSuccessRate() < 95) {
logger.warn("Kafka GPS数据发送成功率过低: {}%", String.format("%.2f", metrics.getSendSuccessRate()));
}

if (metrics.getConsumeSuccessRate() < 90) {
logger.warn("Kafka GPS数据消费成功率过低: {}%", String.format("%.2f", metrics.getConsumeSuccessRate()));
}

// 检查消息积压
long messageBacklog = metrics.getTotalGPSDataSent() - metrics.getTotalGPSDataConsumed();
if (messageBacklog > 10000) {
logger.warn("Kafka GPS数据消息积压过多: {}", messageBacklog);
}

} catch (Exception e) {
logger.error("Kafka GPS数据处理状态监控失败", e);
}
}
}

3.3 Kafka GPS数据指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* Kafka GPS数据指标类
* @author 运维实战
*/
@Data
public class KafkaGPSDataMetrics {

private long totalGPSDataSent;
private long totalGPSDataConsumed;
private long totalSuccessfulSends;
private long totalFailedSends;
private long totalSuccessfulConsumes;
private long totalFailedConsumes;
private long timestamp;

public KafkaGPSDataMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取发送成功率
* @return 发送成功率
*/
public double getSendSuccessRate() {
long total = totalSuccessfulSends + totalFailedSends;
if (total == 0) return 0.0;
return (double) totalSuccessfulSends / total * 100;
}

/**
* 获取发送失败率
* @return 发送失败率
*/
public double getSendFailureRate() {
long total = totalSuccessfulSends + totalFailedSends;
if (total == 0) return 0.0;
return (double) totalFailedSends / total * 100;
}

/**
* 获取消费成功率
* @return 消费成功率
*/
public double getConsumeSuccessRate() {
long total = totalSuccessfulConsumes + totalFailedConsumes;
if (total == 0) return 0.0;
return (double) totalSuccessfulConsumes / total * 100;
}

/**
* 获取消费失败率
* @return 消费失败率
*/
public double getConsumeFailureRate() {
long total = totalSuccessfulConsumes + totalFailedConsumes;
if (total == 0) return 0.0;
return (double) totalFailedConsumes / total * 100;
}

/**
* 获取消息积压数量
* @return 消息积压数量
*/
public long getMessageBacklog() {
return totalGPSDataSent - totalGPSDataConsumed;
}

/**
* 获取消费效率
* @return 消费效率
*/
public double getConsumeEfficiency() {
if (totalGPSDataSent == 0) return 0.0;
return (double) totalGPSDataConsumed / totalGPSDataSent * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getSendSuccessRate() > 95 && getConsumeSuccessRate() > 90 && getMessageBacklog() < 10000;
}
}

3.4 GPS数据存储服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* GPS数据存储服务
* @author 运维实战
*/
@Service
public class GPSDataStorageService {

@Autowired
private MongoTemplate mongoTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final Logger logger = LoggerFactory.getLogger(GPSDataStorageService.class);

/**
* 存储GPS数据
* @param gpsData GPS数据
* @return 存储结果
*/
public GPSDataStorageResult storeGPSData(GPSData gpsData) {
logger.info("存储GPS数据,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());

GPSDataStorageResult result = new GPSDataStorageResult();
result.setVehicleId(gpsData.getVehicleId());
result.setMessageId(gpsData.getMessageId());
result.setStartTime(System.currentTimeMillis());

try {
// 存储到MongoDB
GPSData savedData = mongoTemplate.save(gpsData, "gps_data");

// 存储到Redis缓存
String cacheKey = "gps:vehicle:" + gpsData.getVehicleId();
redisTemplate.opsForValue().set(cacheKey, gpsData, Duration.ofHours(1));

result.setSuccess(true);
result.setStoredData(savedData);
result.setEndTime(System.currentTimeMillis());

logger.info("GPS数据存储成功,车辆ID: {}, 消息ID: {}, 耗时: {}ms",
gpsData.getVehicleId(), gpsData.getMessageId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("GPS数据存储失败,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId(), e);
result.setSuccess(false);
result.setError("GPS数据存储失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量存储GPS数据
* @param gpsDataList GPS数据列表
* @return 批量存储结果
*/
public GPSDataBatchStorageResult batchStoreGPSData(List<GPSData> gpsDataList) {
logger.info("批量存储GPS数据,数量: {}", gpsDataList.size());

GPSDataBatchStorageResult result = new GPSDataBatchStorageResult();
result.setTotalCount(gpsDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 批量存储到MongoDB
Collection<GPSData> savedData = mongoTemplate.insert(gpsDataList, "gps_data");
result.setSuccessCount(savedData.size());
result.setFailureCount(gpsDataList.size() - savedData.size());

// 批量存储到Redis缓存
Map<String, GPSData> cacheData = new HashMap<>();
for (GPSData gpsData : gpsDataList) {
String cacheKey = "gps:vehicle:" + gpsData.getVehicleId();
cacheData.put(cacheKey, gpsData);
}
redisTemplate.opsForValue().multiSet(cacheData);

result.setEndTime(System.currentTimeMillis());

logger.info("批量GPS数据存储完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量GPS数据存储失败", e);
result.setSuccess(false);
result.setError("批量GPS数据存储失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

4. Kafka GPS数据控制器

4.1 Kafka GPS数据REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
* Kafka GPS数据REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/kafka/gps/data")
public class KafkaGPSDataController {

@Autowired
private KafkaGPSDataProcessingService kafkaGPSDataProcessingService;

@Autowired
private KafkaGPSDataMonitorService kafkaGPSDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataController.class);

/**
* 发送GPS数据到Kafka
* @param gpsData GPS数据
* @return 发送结果
*/
@PostMapping("/send")
public ResponseEntity<KafkaGPSDataSendResult> sendGPSData(@RequestBody GPSData gpsData) {
try {
logger.info("接收到GPS数据发送请求,车辆ID: {}, 消息ID: {}", gpsData.getVehicleId(), gpsData.getMessageId());

KafkaGPSDataSendResult result = kafkaGPSDataProcessingService.sendGPSData(gpsData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据发送失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量发送GPS数据到Kafka
* @param gpsDataList GPS数据列表
* @return 批量发送结果
*/
@PostMapping("/batch-send")
public ResponseEntity<KafkaGPSDataBatchSendResult> batchSendGPSData(@RequestBody List<GPSData> gpsDataList) {
try {
logger.info("接收到GPS数据批量发送请求,数量: {}", gpsDataList.size());

KafkaGPSDataBatchSendResult result = kafkaGPSDataProcessingService.batchSendGPSData(gpsDataList);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据批量发送失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 创建GPS数据主题
* @return 创建结果
*/
@PostMapping("/topic/create")
public ResponseEntity<KafkaTopicCreateResult> createGPSDataTopic() {
try {
logger.info("接收到GPS数据主题创建请求");

KafkaTopicCreateResult result = kafkaGPSDataProcessingService.createGPSDataTopic();

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据主题创建失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取主题信息
* @param topicName 主题名
* @return 主题信息
*/
@GetMapping("/topic/info")
public ResponseEntity<KafkaTopicInfo> getTopicInfo(@RequestParam String topicName) {
try {
KafkaTopicInfo topicInfo = kafkaGPSDataProcessingService.getTopicInfo(topicName);

if (topicInfo != null) {
return ResponseEntity.ok(topicInfo);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
logger.error("获取主题信息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取Kafka GPS数据监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<KafkaGPSDataMetrics> getKafkaGPSDataMetrics() {
try {
KafkaGPSDataMetrics metrics = kafkaGPSDataMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取Kafka GPS数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

4.2 GPS数据存储结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* GPS数据存储结果类
* @author 运维实战
*/
@Data
public class GPSDataStorageResult {

private boolean success;
private String vehicleId;
private String messageId;
private GPSData storedData;
private String error;
private long startTime;
private long endTime;

public GPSDataStorageResult() {
this.success = false;
}

/**
* 获取存储耗时
* @return 存储耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* GPS数据批量存储结果类
* @author 运维实战
*/
@Data
public class GPSDataBatchStorageResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public GPSDataBatchStorageResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取存储耗时
* @return 存储耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* GPS数据处理结果类
* @author 运维实战
*/
@Data
public class GPSDataProcessingResult {

private boolean success;
private String vehicleId;
private String error;
private long startTime;
private long endTime;

public GPSDataProcessingResult() {
this.success = false;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* GPS数据批量处理结果类
* @author 运维实战
*/
@Data
public class GPSDataBatchProcessingResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public GPSDataBatchProcessingResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

5. Kafka GPS数据注解和AOP

5.1 Kafka GPS数据处理注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Kafka GPS数据处理注解
* @author 运维实战
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaGPSDataProcessing {

/**
* 车辆ID
*/
String vehicleId() default "";

/**
* 处理类型
*/
String processingType() default "SEND";

/**
* 是否启用Kafka GPS数据处理
*/
boolean enableKafkaGPSDataProcessing() default true;

/**
* 是否启用消息持久化
*/
boolean enableMessagePersistence() default true;

/**
* 是否启用水平扩展
*/
boolean enableHorizontalScaling() default true;

/**
* 是否启用监控
*/
boolean enableMonitoring() default true;

/**
* 操作失败时的消息
*/
String message() default "Kafka GPS数据处理失败,请稍后重试";

/**
* 操作失败时的HTTP状态码
*/
int statusCode() default 500;
}

5.2 Kafka GPS数据处理AOP切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Kafka GPS数据处理AOP切面
* @author 运维实战
*/
@Aspect
@Component
public class KafkaGPSDataProcessingAspect {

@Autowired
private KafkaGPSDataMonitorService kafkaGPSDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataProcessingAspect.class);

/**
* Kafka GPS数据处理切点
*/
@Pointcut("@annotation(kafkaGPSDataProcessing)")
public void kafkaGPSDataProcessingPointcut(KafkaGPSDataProcessing kafkaGPSDataProcessing) {}

/**
* Kafka GPS数据处理环绕通知
* @param joinPoint 连接点
* @param kafkaGPSDataProcessing Kafka GPS数据处理注解
* @return 执行结果
* @throws Throwable 异常
*/
@Around("kafkaGPSDataProcessingPointcut(kafkaGPSDataProcessing)")
public Object around(ProceedingJoinPoint joinPoint, KafkaGPSDataProcessing kafkaGPSDataProcessing) throws Throwable {
String methodName = joinPoint.getSignature().getName();

try {
// 获取方法参数
Object[] args = joinPoint.getArgs();

// 查找车辆ID参数
String vehicleId = kafkaGPSDataProcessing.vehicleId();
String processingType = kafkaGPSDataProcessing.processingType();

if (vehicleId != null && !vehicleId.isEmpty()) {
logger.info("Kafka GPS数据处理开始: method={}, vehicleId={}, processingType={}",
methodName, vehicleId, processingType);

// 记录Kafka GPS数据处理指标
if ("SEND".equals(processingType)) {
kafkaGPSDataMonitorService.recordGPSDataSend(vehicleId, true);
} else if ("CONSUME".equals(processingType)) {
kafkaGPSDataMonitorService.recordGPSDataConsume(vehicleId, true);
}
}

// 执行原方法
return joinPoint.proceed();

} catch (Exception e) {
logger.error("Kafka GPS数据处理异常: method={}", methodName, e);
throw new KafkaGPSDataProcessingException(kafkaGPSDataProcessing.message(), kafkaGPSDataProcessing.statusCode());
}
}
}

5.3 Kafka GPS数据处理异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Kafka GPS数据处理异常类
* @author 运维实战
*/
public class KafkaGPSDataProcessingException extends RuntimeException {

private final int statusCode;

public KafkaGPSDataProcessingException(String message) {
super(message);
this.statusCode = 500;
}

public KafkaGPSDataProcessingException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}

public KafkaGPSDataProcessingException(String message, Throwable cause) {
super(message, cause);
this.statusCode = 500;
}

public KafkaGPSDataProcessingException(String message, Throwable cause, int statusCode) {
super(message, cause);
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}

5.4 Kafka GPS数据处理异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Kafka GPS数据处理异常处理器
* @author 运维实战
*/
@ControllerAdvice
public class KafkaGPSDataProcessingExceptionHandler {

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataProcessingExceptionHandler.class);

/**
* 处理Kafka GPS数据处理异常
* @param e 异常
* @return 错误响应
*/
@ExceptionHandler(KafkaGPSDataProcessingException.class)
public ResponseEntity<Map<String, Object>> handleKafkaGPSDataProcessingException(KafkaGPSDataProcessingException e) {
logger.warn("Kafka GPS数据处理异常: {}", e.getMessage());

Map<String, Object> response = new HashMap<>();
response.put("error", "KAFKA_GPS_DATA_PROCESSING_FAILED");
response.put("message", e.getMessage());
response.put("timestamp", System.currentTimeMillis());

return ResponseEntity.status(e.getStatusCode()).body(response);
}
}

6. 实际应用示例

6.1 使用Kafka GPS数据处理注解的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 使用Kafka GPS数据处理注解的服务
* @author 运维实战
*/
@Service
public class KafkaGPSDataProcessingExampleService {

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataProcessingExampleService.class);

/**
* 基础Kafka GPS数据处理示例
* @param gpsData GPS数据
* @return 处理结果
*/
@KafkaGPSDataProcessing(vehicleId = "test_vehicle", processingType = "SEND",
enableKafkaGPSDataProcessing = true, enableMessagePersistence = true,
enableHorizontalScaling = true, message = "基础Kafka GPS数据处理:操作失败")
public String basicKafkaGPSDataProcessing(GPSData gpsData) {
logger.info("执行基础Kafka GPS数据处理示例,车辆ID: {}", gpsData.getVehicleId());

// 模拟Kafka GPS数据处理操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "基础Kafka GPS数据处理完成,车辆ID: " + gpsData.getVehicleId();
}

/**
* 大批量Kafka GPS数据处理示例
* @param gpsDataList GPS数据列表
* @return 处理结果
*/
@KafkaGPSDataProcessing(vehicleId = "batch_vehicle", processingType = "SEND",
enableKafkaGPSDataProcessing = true, enableMessagePersistence = true,
enableHorizontalScaling = true, message = "大批量Kafka GPS数据处理:操作失败")
public String largeKafkaGPSDataProcessing(List<GPSData> gpsDataList) {
logger.info("执行大批量Kafka GPS数据处理示例,数量: {}", gpsDataList.size());

// 模拟大批量Kafka GPS数据处理操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "大批量Kafka GPS数据处理完成,数量: " + gpsDataList.size();
}

/**
* 高频Kafka GPS数据处理示例
* @param gpsData GPS数据
* @return 处理结果
*/
@KafkaGPSDataProcessing(vehicleId = "high_frequency_vehicle", processingType = "SEND",
enableKafkaGPSDataProcessing = true, enableMessagePersistence = true,
enableHorizontalScaling = true, message = "高频Kafka GPS数据处理:操作失败")
public String highFrequencyKafkaGPSDataProcessing(GPSData gpsData) {
logger.info("执行高频Kafka GPS数据处理示例,车辆ID: {}", gpsData.getVehicleId());

// 模拟高频Kafka GPS数据处理操作
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "高频Kafka GPS数据处理完成,车辆ID: " + gpsData.getVehicleId();
}
}

6.2 Kafka GPS数据处理测试控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* Kafka GPS数据处理测试控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/kafka/gps/data/test")
public class KafkaGPSDataProcessingTestController {

@Autowired
private KafkaGPSDataProcessingExampleService exampleService;

@Autowired
private KafkaGPSDataProcessingService kafkaGPSDataProcessingService;

@Autowired
private KafkaGPSDataMonitorService kafkaGPSDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(KafkaGPSDataProcessingTestController.class);

/**
* 基础Kafka GPS数据处理测试
* @param vehicleId 车辆ID
* @return 测试结果
*/
@GetMapping("/basic")
public ResponseEntity<Map<String, String>> testBasicKafkaGPSDataProcessing(@RequestParam String vehicleId) {
try {
// 生成测试GPS数据
GPSData gpsData = generateTestGPSData(vehicleId);

String result = exampleService.basicKafkaGPSDataProcessing(gpsData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (KafkaGPSDataProcessingException e) {
logger.warn("基础Kafka GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("基础Kafka GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 大批量Kafka GPS数据处理测试
* @param vehicleId 车辆ID
* @param count 数量
* @return 测试结果
*/
@GetMapping("/large")
public ResponseEntity<Map<String, String>> testLargeKafkaGPSDataProcessing(
@RequestParam String vehicleId, @RequestParam int count) {
try {
// 生成测试GPS数据列表
List<GPSData> gpsDataList = generateTestGPSDataList(vehicleId, count);

String result = exampleService.largeKafkaGPSDataProcessing(gpsDataList);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (KafkaGPSDataProcessingException e) {
logger.warn("大批量Kafka GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("大批量Kafka GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 高频Kafka GPS数据处理测试
* @param vehicleId 车辆ID
* @return 测试结果
*/
@GetMapping("/high-frequency")
public ResponseEntity<Map<String, String>> testHighFrequencyKafkaGPSDataProcessing(@RequestParam String vehicleId) {
try {
// 生成测试GPS数据
GPSData gpsData = generateTestGPSData(vehicleId);

String result = exampleService.highFrequencyKafkaGPSDataProcessing(gpsData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (KafkaGPSDataProcessingException e) {
logger.warn("高频Kafka GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("高频Kafka GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取Kafka GPS数据监控指标
* @return Kafka GPS数据监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<KafkaGPSDataMetrics> getKafkaGPSDataMetrics() {
try {
KafkaGPSDataMetrics metrics = kafkaGPSDataMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取Kafka GPS数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 生成测试GPS数据
* @param vehicleId 车辆ID
* @return 测试GPS数据
*/
private GPSData generateTestGPSData(String vehicleId) {
GPSData gpsData = new GPSData();
gpsData.setVehicleId(vehicleId);
gpsData.setLongitude(116.3974 + Math.random() * 0.01); // 北京经度范围
gpsData.setLatitude(39.9093 + Math.random() * 0.01); // 北京纬度范围
gpsData.setAltitude(50.0 + Math.random() * 100);
gpsData.setSpeed(Math.random() * 80);
gpsData.setDirection(Math.random() * 360);
gpsData.setGpsTimestamp(System.currentTimeMillis());
gpsData.setSatelliteCount(8 + (int)(Math.random() * 4));
gpsData.setDataQuality(80 + (int)(Math.random() * 20));
return gpsData;
}

/**
* 生成测试GPS数据列表
* @param vehicleId 车辆ID
* @param count 数量
* @return 测试GPS数据列表
*/
private List<GPSData> generateTestGPSDataList(String vehicleId, int count) {
List<GPSData> gpsDataList = new ArrayList<>();
for (int i = 0; i < count; i++) {
GPSData gpsData = generateTestGPSData(vehicleId + "_" + i);
gpsDataList.add(gpsData);
}
return gpsDataList;
}
}

7. 总结

7.1 Kafka承接车辆高频GPS数据最佳实践

  1. 合理设计Kafka集群: 使用多节点集群保证高可用性
  2. 选择合适的主题配置: 根据业务需求配置分区数和副本数
  3. 监控Kafka GPS数据处理: 实时监控消息发送和消费情况
  4. 动态调整参数: 根据监控数据动态调整Kafka参数
  5. 异常处理: 实现完善的异常处理和用户友好提示

7.2 性能优化建议

  • 分区优化: 合理设置分区数实现并行处理
  • 批量处理: 使用批量发送和消费提升性能
  • 压缩优化: 使用合适的压缩算法减少网络开销
  • 持久化优化: 优化消息持久化策略
  • 水平扩展: 通过增加节点实现水平扩展

7.3 运维管理要点

  • 实时监控: 监控Kafka集群状态和消息处理情况
  • 动态调整: 根据负载情况动态调整集群配置
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化Kafka性能

通过本文的Kafka承接车辆高频GPS数据(支持水平扩展+消息持久化)Java实战指南,您可以掌握Kafka集群部署、消息持久化、水平扩展、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的高频GPS数据处理系统!