1. Elasticsearch轨迹检索概述

Elasticsearch轨迹检索是地理位置服务和轨迹分析的核心技术,通过Elasticsearch的强大搜索能力和地理空间索引,可以实现高效、准确的轨迹数据检索和分析。系统具备轨迹数据索引、地理空间查询、轨迹分析、性能优化、监控告警等功能。本文将详细介绍Elasticsearch轨迹检索的原理、实现方法、性能优化技巧以及在运维实战中的应用。

1.1 Elasticsearch轨迹检索核心价值

  1. 高效检索: 毫秒级轨迹数据检索响应
  2. 地理空间查询: 支持复杂的地理空间查询和分析
  3. 轨迹分析: 轨迹数据的深度分析和挖掘
  4. 可扩展性: 支持大规模轨迹数据的存储和检索
  5. 实时性: 支持实时轨迹数据的索引和查询

1.2 Elasticsearch轨迹检索场景

  • 车辆轨迹检索: 车辆行驶轨迹的查询和分析
  • 人员轨迹分析: 人员移动轨迹的分析和统计
  • 地理围栏: 基于地理位置的围栏服务
  • 轨迹挖掘: 轨迹数据的挖掘和模式识别
  • 实时监控: 实时轨迹数据的监控和告警

1.3 Elasticsearch地理空间技术特性

  • 地理空间索引: 支持geo_point和geo_shape字段类型
  • 空间查询: 支持地理空间范围查询和距离查询
  • 聚合分析: 支持地理空间数据的聚合分析
  • 实时索引: 支持实时轨迹数据的索引
  • 分片集群: 支持水平扩展和分片

2. Elasticsearch轨迹检索基础实现

2.1 Elasticsearch轨迹检索配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Elasticsearch轨迹检索配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(ESTrajectorySearchProperties.class)
public class ESTrajectorySearchConfig {

@Autowired
private ESTrajectorySearchProperties properties;

/**
* Elasticsearch轨迹检索服务
* @return Elasticsearch轨迹检索服务
*/
@Bean
public ESTrajectorySearchService esTrajectorySearchService() {
return new ESTrajectorySearchService();
}

/**
* Elasticsearch轨迹索引服务
* @return Elasticsearch轨迹索引服务
*/
@Bean
public ESTrajectoryIndexService esTrajectoryIndexService() {
return new ESTrajectoryIndexService();
}

/**
* Elasticsearch轨迹查询服务
* @return Elasticsearch轨迹查询服务
*/
@Bean
public ESTrajectoryQueryService esTrajectoryQueryService() {
return new ESTrajectoryQueryService();
}

/**
* Elasticsearch轨迹分析服务
* @return Elasticsearch轨迹分析服务
*/
@Bean
public ESTrajectoryAnalysisService esTrajectoryAnalysisService() {
return new ESTrajectoryAnalysisService();
}

/**
* Elasticsearch轨迹监控服务
* @return Elasticsearch轨迹监控服务
*/
@Bean
public ESTrajectoryMonitorService esTrajectoryMonitorService() {
return new ESTrajectoryMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(ESTrajectorySearchConfig.class);
}

2.2 Elasticsearch轨迹检索属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* Elasticsearch轨迹检索属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "elasticsearch.trajectory")
public class ESTrajectorySearchProperties {

/**
* 是否启用Elasticsearch轨迹检索
*/
private boolean enableESTrajectorySearch = true;

/**
* Elasticsearch集群地址
*/
private List<String> clusterNodes = Arrays.asList("localhost:9200");

/**
* 轨迹数据索引名称
*/
private String trajectoryIndexName = "trajectory_data";

/**
* 轨迹数据索引别名
*/
private String trajectoryIndexAlias = "trajectory_alias";

/**
* 轨迹数据索引分片数
*/
private int trajectoryIndexShards = 3;

/**
* 轨迹数据索引副本数
*/
private int trajectoryIndexReplicas = 1;

/**
* 轨迹数据索引刷新间隔
*/
private String trajectoryIndexRefreshInterval = "1s";

/**
* 轨迹数据保留天数
*/
private int trajectoryDataRetentionDays = 30;

/**
* 是否启用地理空间索引
*/
private boolean enableGeospatialIndex = true;

/**
* 地理空间索引精度
*/
private String geospatialIndexPrecision = "1km";

/**
* 是否启用轨迹分析
*/
private boolean enableTrajectoryAnalysis = true;

/**
* 轨迹分析聚合大小
*/
private int trajectoryAnalysisAggSize = 1000;

/**
* 是否启用实时索引
*/
private boolean enableRealTimeIndexing = true;

/**
* 实时索引批次大小
*/
private int realTimeIndexBatchSize = 1000;

/**
* 实时索引刷新间隔
*/
private String realTimeIndexRefreshInterval = "5s";

/**
* 是否启用监控
*/
private boolean enableMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用告警
*/
private boolean enableAlert = true;

/**
* 索引性能告警阈值(毫秒)
*/
private long indexPerformanceAlertThreshold = 1000;

/**
* 查询性能告警阈值(毫秒)
*/
private long queryPerformanceAlertThreshold = 500;

/**
* 集群健康告警阈值
*/
private String clusterHealthAlertThreshold = "yellow";
}

2.3 轨迹数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/**
* 轨迹数据模型类
* @author 运维实战
*/
@Data
@Document(indexName = "trajectory_data")
public class TrajectoryData {

@Id
private String id;

/**
* 轨迹ID
*/
@Field(name = "trajectory_id", type = FieldType.Keyword)
private String trajectoryId;

/**
* 对象ID(车辆ID、人员ID等)
*/
@Field(name = "object_id", type = FieldType.Keyword)
private String objectId;

/**
* 对象类型
*/
@Field(name = "object_type", type = FieldType.Keyword)
private String objectType;

/**
* 地理位置
*/
@Field(name = "location", type = FieldType.Geo_Point)
private GeoPoint location;

/**
* 经度
*/
@Field(name = "longitude", type = FieldType.Double)
private Double longitude;

/**
* 纬度
*/
@Field(name = "latitude", type = FieldType.Double)
private Double latitude;

/**
* 海拔高度
*/
@Field(name = "altitude", type = FieldType.Double)
private Double altitude;

/**
* 速度(km/h)
*/
@Field(name = "speed", type = FieldType.Double)
private Double speed;

/**
* 方向角
*/
@Field(name = "direction", type = FieldType.Double)
private Double direction;

/**
* 时间戳
*/
@Field(name = "timestamp", type = FieldType.Date)
private Date timestamp;

/**
* GPS时间戳
*/
@Field(name = "gps_timestamp", type = FieldType.Date)
private Date gpsTimestamp;

/**
* 接收时间戳
*/
@Field(name = "receive_timestamp", type = FieldType.Date)
private Date receiveTimestamp;

/**
* 数据质量
*/
@Field(name = "data_quality", type = FieldType.Integer)
private Integer dataQuality;

/**
* 卫星数量
*/
@Field(name = "satellite_count", type = FieldType.Integer)
private Integer satelliteCount;

/**
* 是否有效
*/
@Field(name = "is_valid", type = FieldType.Boolean)
private Boolean isValid;

/**
* 扩展属性
*/
@Field(name = "extended_properties", type = FieldType.Object)
private Map<String, Object> extendedProperties;

public TrajectoryData() {
this.id = UUID.randomUUID().toString();
this.timestamp = new Date();
this.receiveTimestamp = new Date();
this.isValid = true;
this.extendedProperties = new HashMap<>();
}

public TrajectoryData(String objectId, String objectType, Double longitude, Double latitude) {
this();
this.objectId = objectId;
this.objectType = objectType;
this.longitude = longitude;
this.latitude = latitude;
this.location = new GeoPoint(latitude, longitude);
}

/**
* 验证轨迹数据
* @return 是否有效
*/
public boolean validate() {
if (longitude == null || latitude == null) {
return false;
}

if (longitude < -180 || longitude > 180) {
return false;
}

if (latitude < -90 || latitude > 90) {
return false;
}

if (timestamp == null) {
return false;
}

return true;
}

/**
* 计算两点间距离(米)
* @param other 另一个轨迹点
* @return 距离(米)
*/
public double calculateDistance(TrajectoryData other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

final int R = 6371000; // 地球半径(米)
double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLatRad = Math.toRadians(other.latitude - this.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double a = Math.sin(deltaLatRad / 2) * Math.sin(deltaLatRad / 2) +
Math.cos(lat1Rad) * Math.cos(lat2Rad) *
Math.sin(deltaLonRad / 2) * Math.sin(deltaLonRad / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

return R * c;
}

/**
* 计算两点间方向角
* @param other 另一个轨迹点
* @return 方向角(度)
*/
public double calculateBearing(TrajectoryData other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double y = Math.sin(deltaLonRad) * Math.cos(lat2Rad);
double x = Math.cos(lat1Rad) * Math.sin(lat2Rad) -
Math.sin(lat1Rad) * Math.cos(lat2Rad) * Math.cos(deltaLonRad);

double bearing = Math.toDegrees(Math.atan2(y, x));
return (bearing + 360) % 360;
}

/**
* 添加扩展属性
* @param key 键
* @param value 值
*/
public void addExtendedProperty(String key, Object value) {
if (extendedProperties == null) {
extendedProperties = new HashMap<>();
}
extendedProperties.put(key, value);
}

/**
* 获取扩展属性
* @param key 键
* @return
*/
public Object getExtendedProperty(String key) {
return extendedProperties != null ? extendedProperties.get(key) : null;
}
}

2.4 基础Elasticsearch轨迹检索服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/**
* 基础Elasticsearch轨迹检索服务
* @author 运维实战
*/
@Service
public class ESTrajectorySearchService {

@Autowired
private ESTrajectorySearchProperties properties;

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private ESTrajectoryIndexService esTrajectoryIndexService;

@Autowired
private ESTrajectoryQueryService esTrajectoryQueryService;

@Autowired
private ESTrajectoryAnalysisService esTrajectoryAnalysisService;

@Autowired
private ESTrajectoryMonitorService esTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(ESTrajectorySearchService.class);

/**
* 索引轨迹数据
* @param trajectoryData 轨迹数据
* @return 索引结果
*/
public ESTrajectoryIndexResult indexTrajectoryData(TrajectoryData trajectoryData) {
logger.info("索引轨迹数据,轨迹ID: {}, 对象ID: {}", trajectoryData.getTrajectoryId(), trajectoryData.getObjectId());

ESTrajectoryIndexResult result = new ESTrajectoryIndexResult();
result.setTrajectoryId(trajectoryData.getTrajectoryId());
result.setObjectId(trajectoryData.getObjectId());
result.setStartTime(System.currentTimeMillis());

try {
// 验证轨迹数据
if (!trajectoryData.validate()) {
result.setSuccess(false);
result.setError("轨迹数据验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 索引轨迹数据
result = esTrajectoryIndexService.indexTrajectoryData(trajectoryData);

// 记录索引指标
esTrajectoryMonitorService.recordTrajectoryIndexing(trajectoryData.getObjectId(), true);

logger.info("轨迹数据索引完成,轨迹ID: {}, 成功: {}, 耗时: {}ms",
trajectoryData.getTrajectoryId(), result.isSuccess(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据索引异常,轨迹ID: {}", trajectoryData.getTrajectoryId(), e);
result.setSuccess(false);
result.setError("轨迹数据索引异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录索引失败指标
esTrajectoryMonitorService.recordTrajectoryIndexing(trajectoryData.getObjectId(), false);

return result;
}
}

/**
* 批量索引轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量索引结果
*/
public ESTrajectoryBatchIndexResult batchIndexTrajectoryData(List<TrajectoryData> trajectoryDataList) {
logger.info("批量索引轨迹数据,数量: {}", trajectoryDataList.size());

ESTrajectoryBatchIndexResult result = new ESTrajectoryBatchIndexResult();
result.setTotalCount(trajectoryDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 验证和预处理轨迹数据
List<TrajectoryData> validDataList = new ArrayList<>();
List<TrajectoryData> invalidDataList = new ArrayList<>();

for (TrajectoryData trajectoryData : trajectoryDataList) {
if (trajectoryData.validate()) {
validDataList.add(trajectoryData);
} else {
invalidDataList.add(trajectoryData);
logger.warn("轨迹数据验证失败,对象ID: {}", trajectoryData.getObjectId());
}
}

// 批量索引有效数据
if (!validDataList.isEmpty()) {
ESTrajectoryBatchIndexResult indexResult = esTrajectoryIndexService.batchIndexTrajectoryData(validDataList);

result.setSuccessCount(indexResult.getSuccessCount());
result.setFailureCount(indexResult.getFailureCount() + invalidDataList.size());
} else {
result.setFailureCount(trajectoryDataList.size());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量索引指标
esTrajectoryMonitorService.recordTrajectoryBatchIndexing(trajectoryDataList.size(), result.getSuccessCount());

logger.info("批量轨迹数据索引完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量轨迹数据索引异常", e);
result.setSuccess(false);
result.setError("批量轨迹数据索引异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
public ESTrajectoryQueryResult queryTrajectoryData(TrajectoryQuery query) {
logger.info("查询轨迹数据,对象ID: {}, 时间范围: {} - {}",
query.getObjectId(), query.getStartTime(), query.getEndTime());

ESTrajectoryQueryResult result = new ESTrajectoryQueryResult();
result.setQuery(query);
result.setStartTime(System.currentTimeMillis());

try {
// 执行查询
result = esTrajectoryQueryService.queryTrajectoryData(query);

// 记录查询指标
esTrajectoryMonitorService.recordTrajectoryQuery(query.getObjectId(), result.getTrajectoryDataList().size(), true);

logger.info("轨迹数据查询完成,对象ID: {}, 结果数量: {}, 耗时: {}ms",
query.getObjectId(), result.getTrajectoryDataList().size(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据查询异常,对象ID: {}", query.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录查询失败指标
esTrajectoryMonitorService.recordTrajectoryQuery(query.getObjectId(), 0, false);

return result;
}
}

/**
* 地理空间查询
* @param geoQuery 地理查询条件
* @return 查询结果
*/
public ESTrajectoryQueryResult geoQueryTrajectoryData(GeoTrajectoryQuery geoQuery) {
logger.info("地理空间查询轨迹数据,中心点: ({}, {}), 半径: {}m",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), geoQuery.getRadiusMeters());

ESTrajectoryQueryResult result = new ESTrajectoryQueryResult();
result.setStartTime(System.currentTimeMillis());

try {
// 执行地理空间查询
result = esTrajectoryQueryService.geoQueryTrajectoryData(geoQuery);

// 记录地理空间查询指标
esTrajectoryMonitorService.recordTrajectoryGeoQuery(geoQuery.getCenterLongitude(),
geoQuery.getCenterLatitude(), result.getTrajectoryDataList().size(), true);

logger.info("地理空间查询完成,中心点: ({}, {}), 结果数量: {}, 耗时: {}ms",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(),
result.getTrajectoryDataList().size(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("地理空间查询异常,中心点: ({}, {})",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), e);
result.setSuccess(false);
result.setError("地理空间查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录地理空间查询失败指标
esTrajectoryMonitorService.recordTrajectoryGeoQuery(geoQuery.getCenterLongitude(),
geoQuery.getCenterLatitude(), 0, false);

return result;
}
}

/**
* 轨迹分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
public ESTrajectoryAnalysisResult analyzeTrajectoryData(TrajectoryAnalysisQuery analysisQuery) {
logger.info("轨迹数据分析,对象ID: {}, 分析类型: {}",
analysisQuery.getObjectId(), analysisQuery.getAnalysisType());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 执行轨迹分析
result = esTrajectoryAnalysisService.analyzeTrajectoryData(analysisQuery);

// 记录分析指标
esTrajectoryMonitorService.recordTrajectoryAnalysis(analysisQuery.getObjectId(),
analysisQuery.getAnalysisType(), true);

logger.info("轨迹数据分析完成,对象ID: {}, 分析类型: {}, 耗时: {}ms",
analysisQuery.getObjectId(), analysisQuery.getAnalysisType(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录分析失败指标
esTrajectoryMonitorService.recordTrajectoryAnalysis(analysisQuery.getObjectId(),
analysisQuery.getAnalysisType(), false);

return result;
}
}

/**
* 创建轨迹索引
* @return 创建结果
*/
public ESIndexCreateResult createTrajectoryIndex() {
logger.info("创建轨迹索引");

ESIndexCreateResult result = new ESIndexCreateResult();
result.setIndexName(properties.getTrajectoryIndexName());
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引
result = esTrajectoryIndexService.createTrajectoryIndex();

logger.info("轨迹索引创建完成,索引名: {}, 成功: {}, 耗时: {}ms",
properties.getTrajectoryIndexName(), result.isSuccess(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹索引创建异常", e);
result.setSuccess(false);
result.setError("轨迹索引创建异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

2.5 Elasticsearch轨迹检索结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/**
* Elasticsearch轨迹索引结果类
* @author 运维实战
*/
@Data
public class ESTrajectoryIndexResult {

private boolean success;
private String trajectoryId;
private String objectId;
private TrajectoryData indexedData;
private String error;
private long startTime;
private long endTime;

public ESTrajectoryIndexResult() {
this.success = false;
}

/**
* 获取索引耗时
* @return 索引耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Elasticsearch轨迹批量索引结果类
* @author 运维实战
*/
@Data
public class ESTrajectoryBatchIndexResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public ESTrajectoryBatchIndexResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取索引耗时
* @return 索引耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Elasticsearch轨迹查询结果类
* @author 运维实战
*/
@Data
public class ESTrajectoryQueryResult {

private boolean success;
private TrajectoryQuery query;
private List<TrajectoryData> trajectoryDataList;
private int totalCount;
private String error;
private long startTime;
private long endTime;

public ESTrajectoryQueryResult() {
this.success = false;
this.trajectoryDataList = new ArrayList<>();
this.totalCount = 0;
}

/**
* 获取查询耗时
* @return 查询耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Elasticsearch轨迹分析结果类
* @author 运维实战
*/
@Data
public class ESTrajectoryAnalysisResult {

private boolean success;
private TrajectoryAnalysisQuery analysisQuery;
private Map<String, Object> analysisResult;
private String error;
private long startTime;
private long endTime;

public ESTrajectoryAnalysisResult() {
this.success = false;
this.analysisResult = new HashMap<>();
}

/**
* 获取分析耗时
* @return 分析耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* Elasticsearch索引创建结果类
* @author 运维实战
*/
@Data
public class ESIndexCreateResult {

private boolean success;
private String indexName;
private String message;
private String error;
private long startTime;
private long endTime;

public ESIndexCreateResult() {
this.success = false;
}

/**
* 获取创建耗时
* @return 创建耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

3. 高级功能实现

3.1 Elasticsearch轨迹索引服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
/**
* Elasticsearch轨迹索引服务
* @author 运维实战
*/
@Service
public class ESTrajectoryIndexService {

@Autowired
private ESTrajectorySearchProperties properties;

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private ESTrajectoryMonitorService esTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(ESTrajectoryIndexService.class);

/**
* 索引轨迹数据
* @param trajectoryData 轨迹数据
* @return 索引结果
*/
public ESTrajectoryIndexResult indexTrajectoryData(TrajectoryData trajectoryData) {
logger.info("索引轨迹数据到Elasticsearch,轨迹ID: {}", trajectoryData.getTrajectoryId());

ESTrajectoryIndexResult result = new ESTrajectoryIndexResult();
result.setTrajectoryId(trajectoryData.getTrajectoryId());
result.setObjectId(trajectoryData.getObjectId());
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引请求
IndexRequest indexRequest = new IndexRequest(properties.getTrajectoryIndexName())
.id(trajectoryData.getId())
.source(convertToMap(trajectoryData), XContentType.JSON);

// 执行索引操作
IndexResponse indexResponse = elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);

result.setSuccess(true);
result.setIndexedData(trajectoryData);
result.setEndTime(System.currentTimeMillis());

logger.info("轨迹数据索引到Elasticsearch成功,轨迹ID: {}, 文档ID: {}, 耗时: {}ms",
trajectoryData.getTrajectoryId(), indexResponse.getId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据索引到Elasticsearch失败,轨迹ID: {}", trajectoryData.getTrajectoryId(), e);
result.setSuccess(false);
result.setError("轨迹数据索引到Elasticsearch失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量索引轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量索引结果
*/
public ESTrajectoryBatchIndexResult batchIndexTrajectoryData(List<TrajectoryData> trajectoryDataList) {
logger.info("批量索引轨迹数据到Elasticsearch,数量: {}", trajectoryDataList.size());

ESTrajectoryBatchIndexResult result = new ESTrajectoryBatchIndexResult();
result.setTotalCount(trajectoryDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 创建批量请求
BulkRequest bulkRequest = new BulkRequest();

for (TrajectoryData trajectoryData : trajectoryDataList) {
IndexRequest indexRequest = new IndexRequest(properties.getTrajectoryIndexName())
.id(trajectoryData.getId())
.source(convertToMap(trajectoryData), XContentType.JSON);
bulkRequest.add(indexRequest);
}

// 执行批量索引操作
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理批量响应
int successCount = 0;
int failureCount = 0;

for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
failureCount++;
logger.warn("批量索引失败,文档ID: {}, 错误: {}",
itemResponse.getId(), itemResponse.getFailureMessage());
} else {
successCount++;
}
}

result.setSuccessCount(successCount);
result.setFailureCount(failureCount);
result.setEndTime(System.currentTimeMillis());

logger.info("批量轨迹数据索引到Elasticsearch完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量轨迹数据索引到Elasticsearch失败", e);
result.setSuccess(false);
result.setError("批量轨迹数据索引到Elasticsearch失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 创建轨迹索引
* @return 创建结果
*/
public ESIndexCreateResult createTrajectoryIndex() {
logger.info("创建轨迹索引,索引名: {}", properties.getTrajectoryIndexName());

ESIndexCreateResult result = new ESIndexCreateResult();
result.setIndexName(properties.getTrajectoryIndexName());
result.setStartTime(System.currentTimeMillis());

try {
// 检查索引是否存在
if (indexExists(properties.getTrajectoryIndexName())) {
result.setSuccess(true);
result.setMessage("索引已存在");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 创建索引映射
CreateIndexRequest createIndexRequest = new CreateIndexRequest(properties.getTrajectoryIndexName());

// 设置索引设置
createIndexRequest.settings(Settings.builder()
.put("number_of_shards", properties.getTrajectoryIndexShards())
.put("number_of_replicas", properties.getTrajectoryIndexReplicas())
.put("refresh_interval", properties.getTrajectoryIndexRefreshInterval()));

// 设置索引映射
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("trajectory_id")
.field("type", "keyword")
.endObject()
.startObject("object_id")
.field("type", "keyword")
.endObject()
.startObject("object_type")
.field("type", "keyword")
.endObject()
.startObject("location")
.field("type", "geo_point")
.endObject()
.startObject("longitude")
.field("type", "double")
.endObject()
.startObject("latitude")
.field("type", "double")
.endObject()
.startObject("altitude")
.field("type", "double")
.endObject()
.startObject("speed")
.field("type", "double")
.endObject()
.startObject("direction")
.field("type", "double")
.endObject()
.startObject("timestamp")
.field("type", "date")
.endObject()
.startObject("gps_timestamp")
.field("type", "date")
.endObject()
.startObject("receive_timestamp")
.field("type", "date")
.endObject()
.startObject("data_quality")
.field("type", "integer")
.endObject()
.startObject("satellite_count")
.field("type", "integer")
.endObject()
.startObject("is_valid")
.field("type", "boolean")
.endObject()
.startObject("extended_properties")
.field("type", "object")
.endObject()
.endObject()
.endObject();

createIndexRequest.mapping(mappingBuilder);

// 执行创建索引操作
CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);

result.setSuccess(createIndexResponse.isAcknowledged());
result.setMessage("索引创建成功");
result.setEndTime(System.currentTimeMillis());

logger.info("轨迹索引创建成功,索引名: {}, 耗时: {}ms",
properties.getTrajectoryIndexName(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹索引创建失败,索引名: {}", properties.getTrajectoryIndexName(), e);
result.setSuccess(false);
result.setError("轨迹索引创建失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 检查索引是否存在
* @param indexName 索引名称
* @return 是否存在
*/
private boolean indexExists(String indexName) {
try {
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
return elasticsearchClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
logger.error("检查索引是否存在失败,索引名: {}", indexName, e);
return false;
}
}

/**
* 转换轨迹数据为Map
* @param trajectoryData 轨迹数据
* @return Map
*/
private Map<String, Object> convertToMap(TrajectoryData trajectoryData) {
Map<String, Object> map = new HashMap<>();
map.put("trajectory_id", trajectoryData.getTrajectoryId());
map.put("object_id", trajectoryData.getObjectId());
map.put("object_type", trajectoryData.getObjectType());
map.put("location", trajectoryData.getLocation());
map.put("longitude", trajectoryData.getLongitude());
map.put("latitude", trajectoryData.getLatitude());
map.put("altitude", trajectoryData.getAltitude());
map.put("speed", trajectoryData.getSpeed());
map.put("direction", trajectoryData.getDirection());
map.put("timestamp", trajectoryData.getTimestamp());
map.put("gps_timestamp", trajectoryData.getGpsTimestamp());
map.put("receive_timestamp", trajectoryData.getReceiveTimestamp());
map.put("data_quality", trajectoryData.getDataQuality());
map.put("satellite_count", trajectoryData.getSatelliteCount());
map.put("is_valid", trajectoryData.getIsValid());
map.put("extended_properties", trajectoryData.getExtendedProperties());
return map;
}
}

3.2 Elasticsearch轨迹查询服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/**
* Elasticsearch轨迹查询服务
* @author 运维实战
*/
@Service
public class ESTrajectoryQueryService {

@Autowired
private ESTrajectorySearchProperties properties;

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private ESTrajectoryMonitorService esTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(ESTrajectoryQueryService.class);

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
public ESTrajectoryQueryResult queryTrajectoryData(TrajectoryQuery query) {
logger.info("查询轨迹数据,对象ID: {}", query.getObjectId());

ESTrajectoryQueryResult result = new ESTrajectoryQueryResult();
result.setQuery(query);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

if (query.getObjectId() != null) {
boolQuery.must(QueryBuilders.termQuery("object_id", query.getObjectId()));
}

if (query.getObjectType() != null) {
boolQuery.must(QueryBuilders.termQuery("object_type", query.getObjectType()));
}

if (query.getStartTime() != null && query.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(query.getStartTime())
.lte(query.getEndTime()));
}

if (query.getMinLongitude() != null && query.getMaxLongitude() != null) {
boolQuery.must(QueryBuilders.rangeQuery("longitude")
.gte(query.getMinLongitude())
.lte(query.getMaxLongitude()));
}

if (query.getMinLatitude() != null && query.getMaxLatitude() != null) {
boolQuery.must(QueryBuilders.rangeQuery("latitude")
.gte(query.getMinLatitude())
.lte(query.getMaxLatitude()));
}

if (query.getMinSpeed() != null && query.getMaxSpeed() != null) {
boolQuery.must(QueryBuilders.rangeQuery("speed")
.gte(query.getMinSpeed())
.lte(query.getMaxSpeed()));
}

if (query.getMinDataQuality() != null) {
boolQuery.must(QueryBuilders.rangeQuery("data_quality")
.gte(query.getMinDataQuality()));
}

if (query.getIsValid() != null) {
boolQuery.must(QueryBuilders.termQuery("is_valid", query.getIsValid()));
}

searchSourceBuilder.query(boolQuery);

// 设置排序
if (query.getSortField() != null) {
SortOrder sortOrder = "DESC".equalsIgnoreCase(query.getSortDirection()) ?
SortOrder.DESC : SortOrder.ASC;
searchSourceBuilder.sort(query.getSortField(), sortOrder);
}

// 设置分页
if (query.getPageSize() != null && query.getPageNumber() != null) {
searchSourceBuilder.from(query.getPageNumber() * query.getPageSize());
searchSourceBuilder.size(query.getPageSize());
}

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理搜索结果
List<TrajectoryData> trajectoryDataList = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
TrajectoryData trajectoryData = convertToTrajectoryData(hit.getSourceAsMap());
trajectoryDataList.add(trajectoryData);
}

result.setSuccess(true);
result.setTrajectoryDataList(trajectoryDataList);
result.setTotalCount((int) searchResponse.getHits().getTotalHits().value);
result.setEndTime(System.currentTimeMillis());

logger.info("轨迹数据查询成功,对象ID: {}, 结果数量: {}, 耗时: {}ms",
query.getObjectId(), result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据查询异常,对象ID: {}", query.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 地理空间查询轨迹数据
* @param geoQuery 地理查询条件
* @return 查询结果
*/
public ESTrajectoryQueryResult geoQueryTrajectoryData(GeoTrajectoryQuery geoQuery) {
logger.info("地理空间查询轨迹数据,中心点: ({}, {})", geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude());

ESTrajectoryQueryResult result = new ESTrajectoryQueryResult();
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建地理空间查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

// 地理空间范围查询
if (geoQuery.getCenterLongitude() != null && geoQuery.getCenterLatitude() != null &&
geoQuery.getRadiusMeters() != null) {
GeoDistanceQueryBuilder geoDistanceQuery = QueryBuilders.geoDistanceQuery("location")
.point(geoQuery.getCenterLatitude(), geoQuery.getCenterLongitude())
.distance(geoQuery.getRadiusMeters() + "m");
boolQuery.must(geoDistanceQuery);
}

if (geoQuery.getObjectType() != null) {
boolQuery.must(QueryBuilders.termQuery("object_type", geoQuery.getObjectType()));
}

if (geoQuery.getStartTime() != null && geoQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(geoQuery.getStartTime())
.lte(geoQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);

// 设置排序(按距离排序)
searchSourceBuilder.sort(GeoDistanceSortBuilder.geoDistance("location",
geoQuery.getCenterLatitude(), geoQuery.getCenterLongitude()).order(SortOrder.ASC));

// 设置分页
if (geoQuery.getPageSize() != null && geoQuery.getPageNumber() != null) {
searchSourceBuilder.from(geoQuery.getPageNumber() * geoQuery.getPageSize());
searchSourceBuilder.size(geoQuery.getPageSize());
}

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理搜索结果
List<TrajectoryData> trajectoryDataList = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
TrajectoryData trajectoryData = convertToTrajectoryData(hit.getSourceAsMap());
trajectoryDataList.add(trajectoryData);
}

result.setSuccess(true);
result.setTrajectoryDataList(trajectoryDataList);
result.setTotalCount((int) searchResponse.getHits().getTotalHits().value);
result.setEndTime(System.currentTimeMillis());

logger.info("地理空间查询成功,中心点: ({}, {}), 结果数量: {}, 耗时: {}ms",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("地理空间查询异常,中心点: ({}, {})",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), e);
result.setSuccess(false);
result.setError("地理空间查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 转换Map为轨迹数据
* @param map Map数据
* @return 轨迹数据
*/
private TrajectoryData convertToTrajectoryData(Map<String, Object> map) {
TrajectoryData trajectoryData = new TrajectoryData();
trajectoryData.setId((String) map.get("id"));
trajectoryData.setTrajectoryId((String) map.get("trajectory_id"));
trajectoryData.setObjectId((String) map.get("object_id"));
trajectoryData.setObjectType((String) map.get("object_type"));
trajectoryData.setLongitude((Double) map.get("longitude"));
trajectoryData.setLatitude((Double) map.get("latitude"));
trajectoryData.setAltitude((Double) map.get("altitude"));
trajectoryData.setSpeed((Double) map.get("speed"));
trajectoryData.setDirection((Double) map.get("direction"));
trajectoryData.setDataQuality((Integer) map.get("data_quality"));
trajectoryData.setSatelliteCount((Integer) map.get("satellite_count"));
trajectoryData.setIsValid((Boolean) map.get("is_valid"));
trajectoryData.setExtendedProperties((Map<String, Object>) map.get("extended_properties"));

// 处理时间字段
if (map.get("timestamp") != null) {
trajectoryData.setTimestamp(new Date((Long) map.get("timestamp")));
}
if (map.get("gps_timestamp") != null) {
trajectoryData.setGpsTimestamp(new Date((Long) map.get("gps_timestamp")));
}
if (map.get("receive_timestamp") != null) {
trajectoryData.setReceiveTimestamp(new Date((Long) map.get("receive_timestamp")));
}

return trajectoryData;
}
}

3.3 Elasticsearch轨迹分析服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/**
* Elasticsearch轨迹分析服务
* @author 运维实战
*/
@Service
public class ESTrajectoryAnalysisService {

@Autowired
private ESTrajectorySearchProperties properties;

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private ESTrajectoryMonitorService esTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(ESTrajectoryAnalysisService.class);

/**
* 分析轨迹数据
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
public ESTrajectoryAnalysisResult analyzeTrajectoryData(TrajectoryAnalysisQuery analysisQuery) {
logger.info("分析轨迹数据,对象ID: {}, 分析类型: {}",
analysisQuery.getObjectId(), analysisQuery.getAnalysisType());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 根据分析类型执行不同的分析
switch (analysisQuery.getAnalysisType().toUpperCase()) {
case "SPEED_ANALYSIS":
result = analyzeSpeed(analysisQuery);
break;
case "DISTANCE_ANALYSIS":
result = analyzeDistance(analysisQuery);
break;
case "TIME_ANALYSIS":
result = analyzeTime(analysisQuery);
break;
case "QUALITY_ANALYSIS":
result = analyzeQuality(analysisQuery);
break;
case "AGGREGATION_ANALYSIS":
result = analyzeAggregation(analysisQuery);
break;
default:
result = analyzeDefault(analysisQuery);
break;
}

result.setEndTime(System.currentTimeMillis());

logger.info("轨迹数据分析完成,对象ID: {}, 分析类型: {}, 耗时: {}ms",
analysisQuery.getObjectId(), analysisQuery.getAnalysisType(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 速度分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeSpeed(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行速度分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("object_id", analysisQuery.getObjectId()));

if (analysisQuery.getStartTime() != null && analysisQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(analysisQuery.getStartTime())
.lte(analysisQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);

// 添加速度聚合
searchSourceBuilder.aggregation(AggregationBuilders.stats("speed_stats").field("speed"));
searchSourceBuilder.aggregation(AggregationBuilders.avg("avg_speed").field("speed"));
searchSourceBuilder.aggregation(AggregationBuilders.max("max_speed").field("speed"));
searchSourceBuilder.aggregation(AggregationBuilders.min("min_speed").field("speed"));

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合结果
Map<String, Object> analysisResult = new HashMap<>();

Stats speedStats = searchResponse.getAggregations().get("speed_stats");
if (speedStats != null) {
analysisResult.put("avg_speed", speedStats.getAvg());
analysisResult.put("max_speed", speedStats.getMax());
analysisResult.put("min_speed", speedStats.getMin());
analysisResult.put("total_speed", speedStats.getSum());
analysisResult.put("speed_count", speedStats.getCount());
}

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("速度分析完成,对象ID: {}, 耗时: {}ms", analysisQuery.getObjectId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("速度分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("速度分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 距离分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeDistance(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行距离分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("object_id", analysisQuery.getObjectId()));

if (analysisQuery.getStartTime() != null && analysisQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(analysisQuery.getStartTime())
.lte(analysisQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);
searchSourceBuilder.sort("timestamp", SortOrder.ASC);
searchSourceBuilder.size(10000); // 限制结果数量

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 计算总距离
double totalDistance = 0.0;
TrajectoryData previousPoint = null;

for (SearchHit hit : searchResponse.getHits().getHits()) {
TrajectoryData currentPoint = convertToTrajectoryData(hit.getSourceAsMap());

if (previousPoint != null) {
double distance = previousPoint.calculateDistance(currentPoint);
totalDistance += distance;
}

previousPoint = currentPoint;
}

// 处理分析结果
Map<String, Object> analysisResult = new HashMap<>();
analysisResult.put("total_distance", totalDistance);
analysisResult.put("point_count", searchResponse.getHits().getHits().length);
analysisResult.put("avg_distance_per_point", totalDistance / Math.max(1, searchResponse.getHits().getHits().length - 1));

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("距离分析完成,对象ID: {}, 总距离: {}m, 耗时: {}ms",
analysisQuery.getObjectId(), totalDistance, result.getDuration());

return result;

} catch (Exception e) {
logger.error("距离分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("距离分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 时间分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeTime(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行时间分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("object_id", analysisQuery.getObjectId()));

if (analysisQuery.getStartTime() != null && analysisQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(analysisQuery.getStartTime())
.lte(analysisQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);

// 添加时间聚合
searchSourceBuilder.aggregation(AggregationBuilders.min("min_timestamp").field("timestamp"));
searchSourceBuilder.aggregation(AggregationBuilders.max("max_timestamp").field("timestamp"));
searchSourceBuilder.aggregation(AggregationBuilders.cardinality("unique_timestamps").field("timestamp"));

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合结果
Map<String, Object> analysisResult = new HashMap<>();

Min minTimestamp = searchResponse.getAggregations().get("min_timestamp");
Max maxTimestamp = searchResponse.getAggregations().get("max_timestamp");
Cardinality uniqueTimestamps = searchResponse.getAggregations().get("unique_timestamps");

if (minTimestamp != null && maxTimestamp != null) {
long duration = maxTimestamp.getValueAsString() != null ?
Long.parseLong(maxTimestamp.getValueAsString()) - Long.parseLong(minTimestamp.getValueAsString()) : 0;
analysisResult.put("duration_ms", duration);
analysisResult.put("duration_hours", duration / (1000.0 * 60 * 60));
}

if (uniqueTimestamps != null) {
analysisResult.put("unique_timestamps", uniqueTimestamps.getValue());
}

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("时间分析完成,对象ID: {}, 耗时: {}ms", analysisQuery.getObjectId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("时间分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("时间分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 质量分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeQuality(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行质量分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("object_id", analysisQuery.getObjectId()));

if (analysisQuery.getStartTime() != null && analysisQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(analysisQuery.getStartTime())
.lte(analysisQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);

// 添加质量聚合
searchSourceBuilder.aggregation(AggregationBuilders.stats("quality_stats").field("data_quality"));
searchSourceBuilder.aggregation(AggregationBuilders.avg("avg_quality").field("data_quality"));
searchSourceBuilder.aggregation(AggregationBuilders.terms("quality_distribution").field("data_quality"));

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合结果
Map<String, Object> analysisResult = new HashMap<>();

Stats qualityStats = searchResponse.getAggregations().get("quality_stats");
if (qualityStats != null) {
analysisResult.put("avg_quality", qualityStats.getAvg());
analysisResult.put("max_quality", qualityStats.getMax());
analysisResult.put("min_quality", qualityStats.getMin());
analysisResult.put("quality_count", qualityStats.getCount());
}

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("质量分析完成,对象ID: {}, 耗时: {}ms", analysisQuery.getObjectId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("质量分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("质量分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 聚合分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeAggregation(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行聚合分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest(properties.getTrajectoryIndexName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("object_id", analysisQuery.getObjectId()));

if (analysisQuery.getStartTime() != null && analysisQuery.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(analysisQuery.getStartTime())
.lte(analysisQuery.getEndTime()));
}

searchSourceBuilder.query(boolQuery);

// 添加多种聚合
searchSourceBuilder.aggregation(AggregationBuilders.count("total_count").field("_id"));
searchSourceBuilder.aggregation(AggregationBuilders.avg("avg_speed").field("speed"));
searchSourceBuilder.aggregation(AggregationBuilders.avg("avg_quality").field("data_quality"));
searchSourceBuilder.aggregation(AggregationBuilders.terms("object_types").field("object_type"));
searchSourceBuilder.aggregation(AggregationBuilders.dateHistogram("time_buckets")
.field("timestamp").calendarInterval(DateHistogramInterval.HOUR));

searchRequest.source(searchSourceBuilder);

// 执行搜索
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);

// 处理聚合结果
Map<String, Object> analysisResult = new HashMap<>();

Count totalCount = searchResponse.getAggregations().get("total_count");
if (totalCount != null) {
analysisResult.put("total_count", totalCount.getValue());
}

Avg avgSpeed = searchResponse.getAggregations().get("avg_speed");
if (avgSpeed != null) {
analysisResult.put("avg_speed", avgSpeed.getValue());
}

Avg avgQuality = searchResponse.getAggregations().get("avg_quality");
if (avgQuality != null) {
analysisResult.put("avg_quality", avgQuality.getValue());
}

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("聚合分析完成,对象ID: {}, 耗时: {}ms", analysisQuery.getObjectId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("聚合分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("聚合分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 默认分析
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
private ESTrajectoryAnalysisResult analyzeDefault(TrajectoryAnalysisQuery analysisQuery) {
logger.info("执行默认分析,对象ID: {}", analysisQuery.getObjectId());

ESTrajectoryAnalysisResult result = new ESTrajectoryAnalysisResult();
result.setAnalysisQuery(analysisQuery);
result.setStartTime(System.currentTimeMillis());

try {
// 实现默认分析逻辑
Map<String, Object> analysisResult = new HashMap<>();
analysisResult.put("analysis_type", "default");
analysisResult.put("object_id", analysisQuery.getObjectId());
analysisResult.put("timestamp", System.currentTimeMillis());

result.setSuccess(true);
result.setAnalysisResult(analysisResult);
result.setEndTime(System.currentTimeMillis());

logger.info("默认分析完成,对象ID: {}, 耗时: {}ms", analysisQuery.getObjectId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("默认分析异常,对象ID: {}", analysisQuery.getObjectId(), e);
result.setSuccess(false);
result.setError("默认分析异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 转换Map为轨迹数据
* @param map Map数据
* @return 轨迹数据
*/
private TrajectoryData convertToTrajectoryData(Map<String, Object> map) {
TrajectoryData trajectoryData = new TrajectoryData();
trajectoryData.setId((String) map.get("id"));
trajectoryData.setTrajectoryId((String) map.get("trajectory_id"));
trajectoryData.setObjectId((String) map.get("object_id"));
trajectoryData.setObjectType((String) map.get("object_type"));
trajectoryData.setLongitude((Double) map.get("longitude"));
trajectoryData.setLatitude((Double) map.get("latitude"));
trajectoryData.setAltitude((Double) map.get("altitude"));
trajectoryData.setSpeed((Double) map.get("speed"));
trajectoryData.setDirection((Double) map.get("direction"));
trajectoryData.setDataQuality((Integer) map.get("data_quality"));
trajectoryData.setSatelliteCount((Integer) map.get("satellite_count"));
trajectoryData.setIsValid((Boolean) map.get("is_valid"));
trajectoryData.setExtendedProperties((Map<String, Object>) map.get("extended_properties"));

// 处理时间字段
if (map.get("timestamp") != null) {
trajectoryData.setTimestamp(new Date((Long) map.get("timestamp")));
}
if (map.get("gps_timestamp") != null) {
trajectoryData.setGpsTimestamp(new Date((Long) map.get("gps_timestamp")));
}
if (map.get("receive_timestamp") != null) {
trajectoryData.setReceiveTimestamp(new Date((Long) map.get("receive_timestamp")));
}

return trajectoryData;
}
}

3.4 Elasticsearch轨迹监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/**
* Elasticsearch轨迹监控服务
* @author 运维实战
*/
@Service
public class ESTrajectoryMonitorService {

private final AtomicLong totalTrajectoryIndexed = new AtomicLong(0);
private final AtomicLong totalTrajectoryQueried = new AtomicLong(0);
private final AtomicLong totalTrajectoryAnalyzed = new AtomicLong(0);
private final AtomicLong totalSuccessfulIndexings = new AtomicLong(0);
private final AtomicLong totalFailedIndexings = new AtomicLong(0);
private final AtomicLong totalSuccessfulQueries = new AtomicLong(0);
private final AtomicLong totalFailedQueries = new AtomicLong(0);
private final AtomicLong totalSuccessfulAnalyses = new AtomicLong(0);
private final AtomicLong totalFailedAnalyses = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(ESTrajectoryMonitorService.class);

/**
* 记录轨迹索引
* @param objectId 对象ID
* @param success 是否成功
*/
public void recordTrajectoryIndexing(String objectId, boolean success) {
totalTrajectoryIndexed.incrementAndGet();

if (success) {
totalSuccessfulIndexings.incrementAndGet();
} else {
totalFailedIndexings.incrementAndGet();
}

logger.debug("记录轨迹索引: 对象ID={}, 成功={}", objectId, success);
}

/**
* 记录轨迹查询
* @param objectId 对象ID
* @param resultCount 结果数量
* @param success 是否成功
*/
public void recordTrajectoryQuery(String objectId, int resultCount, boolean success) {
totalTrajectoryQueried.incrementAndGet();

if (success) {
totalSuccessfulQueries.incrementAndGet();
} else {
totalFailedQueries.incrementAndGet();
}

logger.debug("记录轨迹查询: 对象ID={}, 结果数量={}, 成功={}", objectId, resultCount, success);
}

/**
* 记录轨迹分析
* @param objectId 对象ID
* @param analysisType 分析类型
* @param success 是否成功
*/
public void recordTrajectoryAnalysis(String objectId, String analysisType, boolean success) {
totalTrajectoryAnalyzed.incrementAndGet();

if (success) {
totalSuccessfulAnalyses.incrementAndGet();
} else {
totalFailedAnalyses.incrementAndGet();
}

logger.debug("记录轨迹分析: 对象ID={}, 分析类型={}, 成功={}", objectId, analysisType, success);
}

/**
* 记录轨迹批量索引
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordTrajectoryBatchIndexing(int totalCount, int successCount) {
totalTrajectoryIndexed.addAndGet(totalCount);
totalSuccessfulIndexings.addAndGet(successCount);
totalFailedIndexings.addAndGet(totalCount - successCount);

logger.debug("记录轨迹批量索引: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 记录轨迹地理空间查询
* @param longitude 经度
* @param latitude 纬度
* @param resultCount 结果数量
* @param success 是否成功
*/
public void recordTrajectoryGeoQuery(Double longitude, Double latitude, int resultCount, boolean success) {
totalTrajectoryQueried.incrementAndGet();

if (success) {
totalSuccessfulQueries.incrementAndGet();
} else {
totalFailedQueries.incrementAndGet();
}

logger.debug("记录轨迹地理空间查询: 经度={}, 纬度={}, 结果数量={}, 成功={}", longitude, latitude, resultCount, success);
}

/**
* 获取监控指标
* @return 监控指标
*/
public ESTrajectoryMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

ESTrajectoryMetrics metrics = new ESTrajectoryMetrics();
metrics.setTotalTrajectoryIndexed(totalTrajectoryIndexed.get());
metrics.setTotalTrajectoryQueried(totalTrajectoryQueried.get());
metrics.setTotalTrajectoryAnalyzed(totalTrajectoryAnalyzed.get());
metrics.setTotalSuccessfulIndexings(totalSuccessfulIndexings.get());
metrics.setTotalFailedIndexings(totalFailedIndexings.get());
metrics.setTotalSuccessfulQueries(totalSuccessfulQueries.get());
metrics.setTotalFailedQueries(totalFailedQueries.get());
metrics.setTotalSuccessfulAnalyses(totalSuccessfulAnalyses.get());
metrics.setTotalFailedAnalyses(totalFailedAnalyses.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalTrajectoryIndexed.set(0);
totalTrajectoryQueried.set(0);
totalTrajectoryAnalyzed.set(0);
totalSuccessfulIndexings.set(0);
totalFailedIndexings.set(0);
totalSuccessfulQueries.set(0);
totalFailedQueries.set(0);
totalSuccessfulAnalyses.set(0);
totalFailedAnalyses.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("Elasticsearch轨迹监控指标重置");
}

/**
* 定期监控Elasticsearch轨迹检索状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorESTrajectorySearchStatus() {
try {
ESTrajectoryMetrics metrics = getMetrics();

logger.info("Elasticsearch轨迹检索监控: 索引={}, 查询={}, 分析={}, 索引成功={}, 索引失败={}, 查询成功={}, 查询失败={}, 分析成功={}, 分析失败={}, 索引成功率={}%, 查询成功率={}%, 分析成功率={}%",
metrics.getTotalTrajectoryIndexed(), metrics.getTotalTrajectoryQueried(), metrics.getTotalTrajectoryAnalyzed(),
metrics.getTotalSuccessfulIndexings(), metrics.getTotalFailedIndexings(),
metrics.getTotalSuccessfulQueries(), metrics.getTotalFailedQueries(),
metrics.getTotalSuccessfulAnalyses(), metrics.getTotalFailedAnalyses(),
String.format("%.2f", metrics.getIndexingSuccessRate()),
String.format("%.2f", metrics.getQuerySuccessRate()),
String.format("%.2f", metrics.getAnalysisSuccessRate()));

// 检查异常情况
if (metrics.getIndexingSuccessRate() < 95) {
logger.warn("Elasticsearch轨迹索引成功率过低: {}%", String.format("%.2f", metrics.getIndexingSuccessRate()));
}

if (metrics.getQuerySuccessRate() < 90) {
logger.warn("Elasticsearch轨迹查询成功率过低: {}%", String.format("%.2f", metrics.getQuerySuccessRate()));
}

if (metrics.getAnalysisSuccessRate() < 85) {
logger.warn("Elasticsearch轨迹分析成功率过低: {}%", String.format("%.2f", metrics.getAnalysisSuccessRate()));
}

} catch (Exception e) {
logger.error("Elasticsearch轨迹检索状态监控失败", e);
}
}
}

3.5 Elasticsearch轨迹指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* Elasticsearch轨迹指标类
* @author 运维实战
*/
@Data
public class ESTrajectoryMetrics {

private long totalTrajectoryIndexed;
private long totalTrajectoryQueried;
private long totalTrajectoryAnalyzed;
private long totalSuccessfulIndexings;
private long totalFailedIndexings;
private long totalSuccessfulQueries;
private long totalFailedQueries;
private long totalSuccessfulAnalyses;
private long totalFailedAnalyses;
private long timestamp;

public ESTrajectoryMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取索引成功率
* @return 索引成功率
*/
public double getIndexingSuccessRate() {
long total = totalSuccessfulIndexings + totalFailedIndexings;
if (total == 0) return 0.0;
return (double) totalSuccessfulIndexings / total * 100;
}

/**
* 获取索引失败率
* @return 索引失败率
*/
public double getIndexingFailureRate() {
long total = totalSuccessfulIndexings + totalFailedIndexings;
if (total == 0) return 0.0;
return (double) totalFailedIndexings / total * 100;
}

/**
* 获取查询成功率
* @return 查询成功率
*/
public double getQuerySuccessRate() {
long total = totalSuccessfulQueries + totalFailedQueries;
if (total == 0) return 0.0;
return (double) totalSuccessfulQueries / total * 100;
}

/**
* 获取查询失败率
* @return 查询失败率
*/
public double getQueryFailureRate() {
long total = totalSuccessfulQueries + totalFailedQueries;
if (total == 0) return 0.0;
return (double) totalFailedQueries / total * 100;
}

/**
* 获取分析成功率
* @return 分析成功率
*/
public double getAnalysisSuccessRate() {
long total = totalSuccessfulAnalyses + totalFailedAnalyses;
if (total == 0) return 0.0;
return (double) totalSuccessfulAnalyses / total * 100;
}

/**
* 获取分析失败率
* @return 分析失败率
*/
public double getAnalysisFailureRate() {
long total = totalSuccessfulAnalyses + totalFailedAnalyses;
if (total == 0) return 0.0;
return (double) totalFailedAnalyses / total * 100;
}

/**
* 获取索引效率
* @return 索引效率
*/
public double getIndexingEfficiency() {
if (totalTrajectoryIndexed == 0) return 0.0;
return (double) totalSuccessfulIndexings / totalTrajectoryIndexed * 100;
}

/**
* 获取查询效率
* @return 查询效率
*/
public double getQueryEfficiency() {
if (totalTrajectoryQueried == 0) return 0.0;
return (double) totalSuccessfulQueries / totalTrajectoryQueried * 100;
}

/**
* 获取分析效率
* @return 分析效率
*/
public double getAnalysisEfficiency() {
if (totalTrajectoryAnalyzed == 0) return 0.0;
return (double) totalSuccessfulAnalyses / totalTrajectoryAnalyzed * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getIndexingSuccessRate() > 95 &&
getQuerySuccessRate() > 90 &&
getAnalysisSuccessRate() > 85;
}
}

4. Elasticsearch轨迹检索控制器

4.1 Elasticsearch轨迹检索REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* Elasticsearch轨迹检索REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/elasticsearch/trajectory")
public class ESTrajectorySearchController {

@Autowired
private ESTrajectorySearchService esTrajectorySearchService;

@Autowired
private ESTrajectoryMonitorService esTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(ESTrajectorySearchController.class);

/**
* 索引轨迹数据
* @param trajectoryData 轨迹数据
* @return 索引结果
*/
@PostMapping("/index")
public ResponseEntity<ESTrajectoryIndexResult> indexTrajectoryData(@RequestBody TrajectoryData trajectoryData) {
try {
logger.info("接收到轨迹数据索引请求,轨迹ID: {}", trajectoryData.getTrajectoryId());

ESTrajectoryIndexResult result = esTrajectorySearchService.indexTrajectoryData(trajectoryData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据索引失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量索引轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量索引结果
*/
@PostMapping("/batch-index")
public ResponseEntity<ESTrajectoryBatchIndexResult> batchIndexTrajectoryData(@RequestBody List<TrajectoryData> trajectoryDataList) {
try {
logger.info("接收到轨迹数据批量索引请求,数量: {}", trajectoryDataList.size());

ESTrajectoryBatchIndexResult result = esTrajectorySearchService.batchIndexTrajectoryData(trajectoryDataList);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据批量索引失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
@PostMapping("/query")
public ResponseEntity<ESTrajectoryQueryResult> queryTrajectoryData(@RequestBody TrajectoryQuery query) {
try {
logger.info("接收到轨迹数据查询请求,对象ID: {}", query.getObjectId());

ESTrajectoryQueryResult result = esTrajectorySearchService.queryTrajectoryData(query);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据查询失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 地理空间查询轨迹数据
* @param geoQuery 地理查询条件
* @return 查询结果
*/
@PostMapping("/geo-query")
public ResponseEntity<ESTrajectoryQueryResult> geoQueryTrajectoryData(@RequestBody GeoTrajectoryQuery geoQuery) {
try {
logger.info("接收到轨迹数据地理空间查询请求,中心点: ({}, {})", geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude());

ESTrajectoryQueryResult result = esTrajectorySearchService.geoQueryTrajectoryData(geoQuery);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据地理空间查询失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 分析轨迹数据
* @param analysisQuery 分析查询条件
* @return 分析结果
*/
@PostMapping("/analyze")
public ResponseEntity<ESTrajectoryAnalysisResult> analyzeTrajectoryData(@RequestBody TrajectoryAnalysisQuery analysisQuery) {
try {
logger.info("接收到轨迹数据分析请求,对象ID: {}, 分析类型: {}", analysisQuery.getObjectId(), analysisQuery.getAnalysisType());

ESTrajectoryAnalysisResult result = esTrajectorySearchService.analyzeTrajectoryData(analysisQuery);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据分析失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 创建轨迹索引
* @return 创建结果
*/
@PostMapping("/create-index")
public ResponseEntity<ESIndexCreateResult> createTrajectoryIndex() {
try {
logger.info("接收到轨迹索引创建请求");

ESIndexCreateResult result = esTrajectorySearchService.createTrajectoryIndex();

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹索引创建失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取Elasticsearch轨迹检索监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<ESTrajectoryMetrics> getESTrajectoryMetrics() {
try {
ESTrajectoryMetrics metrics = esTrajectoryMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取Elasticsearch轨迹检索监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

5. 总结

5.1 Elasticsearch轨迹检索最佳实践

  1. 合理设计索引结构: 利用Elasticsearch的地理空间索引设计高效的轨迹数据结构
  2. 优化查询性能: 使用合适的查询方式和聚合分析
  3. 创建地理空间索引: 创建geo_point索引支持地理空间查询
  4. 监控轨迹检索: 实时监控轨迹索引和查询性能
  5. 异常处理: 实现完善的异常处理和用户友好提示

5.2 性能优化建议

  • 索引优化: 合理设置索引分片和副本数
  • 查询优化: 使用合适的查询方式和过滤条件
  • 聚合优化: 优化聚合分析性能
  • 地理空间优化: 优化地理空间查询性能
  • 缓存策略: 实现查询结果缓存策略

5.3 运维管理要点

  • 实时监控: 监控轨迹索引和查询性能
  • 动态调整: 根据负载情况动态调整Elasticsearch配置
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化Elasticsearch性能

通过本文的Elasticsearch实现轨迹检索运维实战指南,您可以掌握Elasticsearch轨迹检索的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、准确的轨迹检索系统!