前言

MongoDB微服务跨库查询作为NoSQL分布式系统中的重要挑战,直接影响着数据一致性和查询性能。通过合理的MongoDB跨库查询策略和数据聚合方案,能够在不破坏微服务边界的前提下实现复杂的数据查询需求,确保系统的稳定运行。本文从MongoDB跨库查询策略到数据聚合方案,从基础实现到企业级应用,系统梳理MongoDB微服务跨库查询的完整解决方案。

一、MongoDB微服务跨库查询架构设计

1.1 MongoDB微服务跨库查询整体架构

1.2 MongoDB跨库查询策略架构

二、MongoDB跨库查询策略实现

2.1 MongoDB聚合管道策略

2.1.1 MongoDB聚合服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/**
* MongoDB聚合服务
*/
@Service
public class MongoAggregationService {

@Autowired
private MongoTemplate userMongoTemplate;

@Autowired
private MongoTemplate orderMongoTemplate;

@Autowired
private MongoTemplate productMongoTemplate;

@Autowired
private MongoTemplate paymentMongoTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String AGGREGATION_CACHE_PREFIX = "mongo_aggregation_cache:";
private final long AGGREGATION_CACHE_EXPIRE = 1800; // 30分钟

/**
* 跨库聚合查询用户订单信息
*/
public UserOrderAggregationResult aggregateUserOrderInfo(Long userId) {
try {
// 从缓存获取
String cacheKey = AGGREGATION_CACHE_PREFIX + "user_order:" + userId;
UserOrderAggregationResult cachedResult = (UserOrderAggregationResult) redisTemplate.opsForValue().get(cacheKey);

if (cachedResult != null) {
return cachedResult;
}

// 1. 获取用户信息
UserInfo userInfo = getUserInfo(userId);

// 2. 获取用户订单聚合信息
OrderAggregationInfo orderAggregation = getOrderAggregationInfo(userId);

// 3. 获取订单商品聚合信息
ProductAggregationInfo productAggregation = getProductAggregationInfo(userId);

// 4. 聚合数据
UserOrderAggregationResult result = aggregateUserOrderData(userInfo, orderAggregation, productAggregation);

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofSeconds(AGGREGATION_CACHE_EXPIRE));

return result;

} catch (Exception e) {
log.error("跨库聚合查询用户订单信息失败", e);
throw new MongoCrossDatabaseQueryException("跨库聚合查询用户订单信息失败", e);
}
}

/**
* 获取用户信息
*/
private UserInfo getUserInfo(Long userId) {
try {
Query query = new Query(Criteria.where("_id").is(userId));
return userMongoTemplate.findOne(query, UserInfo.class, "users");
} catch (Exception e) {
log.error("获取用户信息失败", e);
throw new MongoCrossDatabaseQueryException("获取用户信息失败", e);
}
}

/**
* 获取订单聚合信息
*/
private OrderAggregationInfo getOrderAggregationInfo(Long userId) {
try {
// 构建聚合管道
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("userId").is(userId)),
Aggregation.group("status")
.count().as("count")
.sum("amount").as("totalAmount")
.avg("amount").as("avgAmount"),
Aggregation.sort(Sort.Direction.DESC, "count")
);

AggregationResults<OrderAggregationInfo> results = orderMongoTemplate.aggregate(
aggregation, "orders", OrderAggregationInfo.class);

List<OrderAggregationInfo> aggregationList = results.getMappedResults();

// 构建聚合信息
OrderAggregationInfo aggregationInfo = new OrderAggregationInfo();
aggregationInfo.setUserId(userId);
aggregationInfo.setTotalOrders(aggregationList.stream().mapToInt(OrderAggregationInfo::getCount).sum());
aggregationInfo.setTotalAmount(aggregationList.stream().mapToDouble(OrderAggregationInfo::getTotalAmount).sum());
aggregationInfo.setAvgAmount(aggregationList.stream().mapToDouble(OrderAggregationInfo::getAvgAmount).average().orElse(0.0));
aggregationInfo.setStatusAggregation(aggregationList);

return aggregationInfo;

} catch (Exception e) {
log.error("获取订单聚合信息失败", e);
throw new MongoCrossDatabaseQueryException("获取订单聚合信息失败", e);
}
}

/**
* 获取商品聚合信息
*/
private ProductAggregationInfo getProductAggregationInfo(Long userId) {
try {
// 构建聚合管道
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.lookup("orders", "_id", "productId", "orders"),
Aggregation.unwind("orders"),
Aggregation.match(Criteria.where("orders.userId").is(userId)),
Aggregation.group("_id")
.first("name").as("productName")
.first("price").as("productPrice")
.count().as("orderCount")
.sum("orders.quantity").as("totalQuantity")
.sum("orders.amount").as("totalAmount"),
Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);

AggregationResults<ProductAggregationInfo> results = productMongoTemplate.aggregate(
aggregation, "products", ProductAggregationInfo.class);

List<ProductAggregationInfo> aggregationList = results.getMappedResults();

// 构建聚合信息
ProductAggregationInfo aggregationInfo = new ProductAggregationInfo();
aggregationInfo.setUserId(userId);
aggregationInfo.setTotalProducts(aggregationList.size());
aggregationInfo.setTotalQuantity(aggregationList.stream().mapToInt(ProductAggregationInfo::getTotalQuantity).sum());
aggregationInfo.setTotalAmount(aggregationList.stream().mapToDouble(ProductAggregationInfo::getTotalAmount).sum());
aggregationInfo.setProductAggregation(aggregationList);

return aggregationInfo;

} catch (Exception e) {
log.error("获取商品聚合信息失败", e);
throw new MongoCrossDatabaseQueryException("获取商品聚合信息失败", e);
}
}

/**
* 聚合用户订单数据
*/
private UserOrderAggregationResult aggregateUserOrderData(UserInfo userInfo,
OrderAggregationInfo orderAggregation, ProductAggregationInfo productAggregation) {
try {
UserOrderAggregationResult result = new UserOrderAggregationResult();
result.setUserId(userInfo.getId());
result.setUserName(userInfo.getName());
result.setUserEmail(userInfo.getEmail());
result.setUserPhone(userInfo.getPhone());

// 聚合订单信息
result.setTotalOrders(orderAggregation.getTotalOrders());
result.setTotalOrderAmount(orderAggregation.getTotalAmount());
result.setAvgOrderAmount(orderAggregation.getAvgAmount());
result.setOrderStatusAggregation(orderAggregation.getStatusAggregation());

// 聚合商品信息
result.setTotalProducts(productAggregation.getTotalProducts());
result.setTotalQuantity(productAggregation.getTotalQuantity());
result.setTotalProductAmount(productAggregation.getTotalAmount());
result.setProductAggregation(productAggregation.getProductAggregation());

// 计算统计信息
calculateUserOrderStatistics(result);

return result;

} catch (Exception e) {
log.error("聚合用户订单数据失败", e);
throw new MongoCrossDatabaseQueryException("聚合用户订单数据失败", e);
}
}

/**
* 计算用户订单统计信息
*/
private void calculateUserOrderStatistics(UserOrderAggregationResult result) {
try {
UserOrderStatistics statistics = new UserOrderStatistics();

// 计算订单统计
statistics.setOrderCount(result.getTotalOrders());
statistics.setOrderAmount(result.getTotalOrderAmount());
statistics.setAvgOrderAmount(result.getAvgOrderAmount());

// 计算商品统计
statistics.setProductCount(result.getTotalProducts());
statistics.setProductQuantity(result.getTotalQuantity());
statistics.setProductAmount(result.getTotalProductAmount());

// 计算综合统计
statistics.setTotalAmount(result.getTotalOrderAmount() + result.getTotalProductAmount());
statistics.setAvgProductPrice(result.getTotalQuantity() > 0 ?
result.getTotalProductAmount() / result.getTotalQuantity() : 0.0);

result.setStatistics(statistics);

} catch (Exception e) {
log.error("计算用户订单统计信息失败", e);
}
}

/**
* 并行聚合查询多个集合
*/
public CompletableFuture<MultiCollectionAggregationResult> aggregateMultipleCollectionsAsync(
MultiCollectionAggregationRequest request) {
try {
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
return getUserInfo(request.getUserId());
});

CompletableFuture<OrderAggregationInfo> orderFuture = CompletableFuture.supplyAsync(() -> {
return getOrderAggregationInfo(request.getUserId());
});

CompletableFuture<ProductAggregationInfo> productFuture = CompletableFuture.supplyAsync(() -> {
return getProductAggregationInfo(request.getUserId());
});

CompletableFuture<PaymentAggregationInfo> paymentFuture = CompletableFuture.supplyAsync(() -> {
return getPaymentAggregationInfo(request.getUserId());
});

// 等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
userFuture, orderFuture, productFuture, paymentFuture);

return allFutures.thenApply(v -> {
MultiCollectionAggregationResult result = new MultiCollectionAggregationResult();
try {
result.setUserInfo(userFuture.get());
result.setOrderAggregation(orderFuture.get());
result.setProductAggregation(productFuture.get());
result.setPaymentAggregation(paymentFuture.get());
result.setStatus(AggregationStatus.SUCCESS);
} catch (Exception e) {
result.setStatus(AggregationStatus.FAILED);
result.setErrorMessage(e.getMessage());
}
return result;
});

} catch (Exception e) {
log.error("并行聚合查询多个集合失败", e);
CompletableFuture<MultiCollectionAggregationResult> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}

/**
* 获取支付聚合信息
*/
private PaymentAggregationInfo getPaymentAggregationInfo(Long userId) {
try {
// 构建聚合管道
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("userId").is(userId)),
Aggregation.group("paymentMethod")
.count().as("count")
.sum("amount").as("totalAmount")
.avg("amount").as("avgAmount"),
Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);

AggregationResults<PaymentAggregationInfo> results = paymentMongoTemplate.aggregate(
aggregation, "payments", PaymentAggregationInfo.class);

List<PaymentAggregationInfo> aggregationList = results.getMappedResults();

// 构建聚合信息
PaymentAggregationInfo aggregationInfo = new PaymentAggregationInfo();
aggregationInfo.setUserId(userId);
aggregationInfo.setTotalPayments(aggregationList.stream().mapToInt(PaymentAggregationInfo::getCount).sum());
aggregationInfo.setTotalAmount(aggregationList.stream().mapToDouble(PaymentAggregationInfo::getTotalAmount).sum());
aggregationInfo.setAvgAmount(aggregationList.stream().mapToDouble(PaymentAggregationInfo::getAvgAmount).average().orElse(0.0));
aggregationInfo.setPaymentMethodAggregation(aggregationList);

return aggregationInfo;

} catch (Exception e) {
log.error("获取支付聚合信息失败", e);
throw new MongoCrossDatabaseQueryException("获取支付聚合信息失败", e);
}
}

/**
* 使用MongoDB聚合管道进行复杂查询
*/
public ComplexAggregationResult executeComplexAggregation(ComplexAggregationRequest request) {
try {
// 构建复杂的聚合管道
List<AggregationOperation> operations = new ArrayList<>();

// 1. 匹配阶段
if (request.getMatchCriteria() != null) {
operations.add(Aggregation.match(request.getMatchCriteria()));
}

// 2. 关联阶段
if (request.getLookupOperations() != null) {
for (LookupOperation lookup : request.getLookupOperations()) {
operations.add(Aggregation.lookup(lookup.getFrom(), lookup.getLocalField(),
lookup.getForeignField(), lookup.getAs()));
}
}

// 3. 展开阶段
if (request.isUnwind()) {
operations.add(Aggregation.unwind(request.getUnwindField()));
}

// 4. 分组阶段
if (request.getGroupOperations() != null) {
for (GroupOperation group : request.getGroupOperations()) {
operations.add(Aggregation.group(group.getGroupBy())
.count().as("count")
.sum(group.getSumField()).as("total")
.avg(group.getAvgField()).as("average"));
}
}

// 5. 排序阶段
if (request.getSortOperations() != null) {
for (SortOperation sort : request.getSortOperations()) {
operations.add(Aggregation.sort(sort.getDirection(), sort.getFields()));
}
}

// 6. 限制阶段
if (request.getLimit() > 0) {
operations.add(Aggregation.limit(request.getLimit()));
}

// 执行聚合
Aggregation aggregation = Aggregation.newAggregation(operations);
AggregationResults<ComplexAggregationResult> results = userMongoTemplate.aggregate(
aggregation, request.getCollection(), ComplexAggregationResult.class);

List<ComplexAggregationResult> resultList = results.getMappedResults();

// 构建结果
ComplexAggregationResult result = new ComplexAggregationResult();
result.setTotalCount(resultList.size());
result.setResults(resultList);
result.setExecutionTime(System.currentTimeMillis() - request.getStartTime());

return result;

} catch (Exception e) {
log.error("执行复杂聚合查询失败", e);
throw new MongoCrossDatabaseQueryException("执行复杂聚合查询失败", e);
}
}
}

2.2 MongoDB跨库查询策略

2.2.1 MongoDB跨库查询服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
/**
* MongoDB跨库查询服务
*/
@Service
public class MongoCrossDatabaseQueryService {

@Autowired
private MongoTemplate userMongoTemplate;

@Autowired
private MongoTemplate orderMongoTemplate;

@Autowired
private MongoTemplate productMongoTemplate;

@Autowired
private MongoTemplate paymentMongoTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String CROSS_DB_CACHE_PREFIX = "cross_db_cache:";
private final long CROSS_DB_CACHE_EXPIRE = 1800; // 30分钟

/**
* 跨库查询用户完整信息
*/
public UserCompleteInfo crossDatabaseQueryUserCompleteInfo(Long userId) {
try {
// 从缓存获取
String cacheKey = CROSS_DB_CACHE_PREFIX + "user_complete:" + userId;
UserCompleteInfo cachedInfo = (UserCompleteInfo) redisTemplate.opsForValue().get(cacheKey);

if (cachedInfo != null) {
return cachedInfo;
}

// 并行查询多个数据库
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
return getUserInfoFromUserDB(userId);
});

CompletableFuture<List<OrderInfo>> ordersFuture = CompletableFuture.supplyAsync(() -> {
return getOrdersFromOrderDB(userId);
});

CompletableFuture<List<ProductInfo>> productsFuture = CompletableFuture.supplyAsync(() -> {
return getProductsFromProductDB(userId);
});

CompletableFuture<List<PaymentInfo>> paymentsFuture = CompletableFuture.supplyAsync(() -> {
return getPaymentsFromPaymentDB(userId);
});

// 等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
userFuture, ordersFuture, productsFuture, paymentsFuture);

allFutures.get(); // 等待完成

// 聚合数据
UserCompleteInfo userCompleteInfo = new UserCompleteInfo();
userCompleteInfo.setUserId(userId);
userCompleteInfo.setUserInfo(userFuture.get());
userCompleteInfo.setOrders(ordersFuture.get());
userCompleteInfo.setProducts(productsFuture.get());
userCompleteInfo.setPayments(paymentsFuture.get());

// 计算统计信息
calculateUserCompleteStatistics(userCompleteInfo);

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, userCompleteInfo,
Duration.ofSeconds(CROSS_DB_CACHE_EXPIRE));

return userCompleteInfo;

} catch (Exception e) {
log.error("跨库查询用户完整信息失败", e);
throw new MongoCrossDatabaseQueryException("跨库查询用户完整信息失败", e);
}
}

/**
* 从用户数据库获取用户信息
*/
private UserInfo getUserInfoFromUserDB(Long userId) {
try {
Query query = new Query(Criteria.where("_id").is(userId));
return userMongoTemplate.findOne(query, UserInfo.class, "users");
} catch (Exception e) {
log.error("从用户数据库获取用户信息失败", e);
throw new MongoCrossDatabaseQueryException("从用户数据库获取用户信息失败", e);
}
}

/**
* 从订单数据库获取订单信息
*/
private List<OrderInfo> getOrdersFromOrderDB(Long userId) {
try {
Query query = new Query(Criteria.where("userId").is(userId));
query.with(Sort.by(Sort.Direction.DESC, "createTime"));
return orderMongoTemplate.find(query, OrderInfo.class, "orders");
} catch (Exception e) {
log.error("从订单数据库获取订单信息失败", e);
throw new MongoCrossDatabaseQueryException("从订单数据库获取订单信息失败", e);
}
}

/**
* 从商品数据库获取商品信息
*/
private List<ProductInfo> getProductsFromProductDB(Long userId) {
try {
// 先获取用户购买的商品ID
Query orderQuery = new Query(Criteria.where("userId").is(userId));
orderQuery.fields().include("productId");
List<OrderInfo> orders = orderMongoTemplate.find(orderQuery, OrderInfo.class, "orders");

if (orders.isEmpty()) {
return new ArrayList<>();
}

// 提取商品ID
List<Long> productIds = orders.stream()
.map(OrderInfo::getProductId)
.distinct()
.collect(Collectors.toList());

// 查询商品信息
Query productQuery = new Query(Criteria.where("_id").in(productIds));
return productMongoTemplate.find(productQuery, ProductInfo.class, "products");

} catch (Exception e) {
log.error("从商品数据库获取商品信息失败", e);
throw new MongoCrossDatabaseQueryException("从商品数据库获取商品信息失败", e);
}
}

/**
* 从支付数据库获取支付信息
*/
private List<PaymentInfo> getPaymentsFromPaymentDB(Long userId) {
try {
Query query = new Query(Criteria.where("userId").is(userId));
query.with(Sort.by(Sort.Direction.DESC, "createTime"));
return paymentMongoTemplate.find(query, PaymentInfo.class, "payments");
} catch (Exception e) {
log.error("从支付数据库获取支付信息失败", e);
throw new MongoCrossDatabaseQueryException("从支付数据库获取支付信息失败", e);
}
}

/**
* 计算用户完整统计信息
*/
private void calculateUserCompleteStatistics(UserCompleteInfo userCompleteInfo) {
try {
UserCompleteStatistics statistics = new UserCompleteStatistics();

// 订单统计
List<OrderInfo> orders = userCompleteInfo.getOrders();
if (orders != null && !orders.isEmpty()) {
statistics.setTotalOrders(orders.size());
statistics.setTotalOrderAmount(orders.stream()
.mapToDouble(OrderInfo::getAmount)
.sum());

// 按状态统计
Map<String, Long> statusCount = orders.stream()
.collect(Collectors.groupingBy(OrderInfo::getStatus, Collectors.counting()));
statistics.setOrderStatusCount(statusCount);

// 按时间统计
Map<String, Long> timeCount = orders.stream()
.collect(Collectors.groupingBy(order ->
order.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().toString(),
Collectors.counting()));
statistics.setOrderTimeCount(timeCount);
}

// 商品统计
List<ProductInfo> products = userCompleteInfo.getProducts();
if (products != null && !products.isEmpty()) {
statistics.setTotalProducts(products.size());
statistics.setTotalProductAmount(products.stream()
.mapToDouble(ProductInfo::getPrice)
.sum());

// 按分类统计
Map<String, Long> categoryCount = products.stream()
.collect(Collectors.groupingBy(ProductInfo::getCategory, Collectors.counting()));
statistics.setProductCategoryCount(categoryCount);
}

// 支付统计
List<PaymentInfo> payments = userCompleteInfo.getPayments();
if (payments != null && !payments.isEmpty()) {
statistics.setTotalPayments(payments.size());
statistics.setTotalPaymentAmount(payments.stream()
.mapToDouble(PaymentInfo::getAmount)
.sum());

// 按支付方式统计
Map<String, Long> paymentMethodCount = payments.stream()
.collect(Collectors.groupingBy(PaymentInfo::getPaymentMethod, Collectors.counting()));
statistics.setPaymentMethodCount(paymentMethodCount);
}

userCompleteInfo.setStatistics(statistics);

} catch (Exception e) {
log.error("计算用户完整统计信息失败", e);
}
}

/**
* 跨库查询订单详细信息
*/
public OrderDetailInfo crossDatabaseQueryOrderDetailInfo(Long orderId) {
try {
// 从缓存获取
String cacheKey = CROSS_DB_CACHE_PREFIX + "order_detail:" + orderId;
OrderDetailInfo cachedInfo = (OrderDetailInfo) redisTemplate.opsForValue().get(cacheKey);

if (cachedInfo != null) {
return cachedInfo;
}

// 并行查询
CompletableFuture<OrderInfo> orderFuture = CompletableFuture.supplyAsync(() -> {
return getOrderInfoFromOrderDB(orderId);
});

CompletableFuture<List<ProductInfo>> productsFuture = CompletableFuture.supplyAsync(() -> {
OrderInfo order = orderFuture.join();
return getProductsByOrderIdFromProductDB(order.getProductId());
});

CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
OrderInfo order = orderFuture.join();
return getUserInfoFromUserDB(order.getUserId());
});

CompletableFuture<List<PaymentInfo>> paymentsFuture = CompletableFuture.supplyAsync(() -> {
OrderInfo order = orderFuture.join();
return getPaymentsByOrderIdFromPaymentDB(orderId);
});

// 等待完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
orderFuture, productsFuture, userFuture, paymentsFuture);
allFutures.get();

// 聚合数据
OrderDetailInfo orderDetailInfo = new OrderDetailInfo();
orderDetailInfo.setOrderInfo(orderFuture.get());
orderDetailInfo.setProducts(productsFuture.get());
orderDetailInfo.setUserInfo(userFuture.get());
orderDetailInfo.setPayments(paymentsFuture.get());

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, orderDetailInfo,
Duration.ofSeconds(CROSS_DB_CACHE_EXPIRE));

return orderDetailInfo;

} catch (Exception e) {
log.error("跨库查询订单详细信息失败", e);
throw new MongoCrossDatabaseQueryException("跨库查询订单详细信息失败", e);
}
}

/**
* 从订单数据库获取订单信息
*/
private OrderInfo getOrderInfoFromOrderDB(Long orderId) {
try {
Query query = new Query(Criteria.where("_id").is(orderId));
return orderMongoTemplate.findOne(query, OrderInfo.class, "orders");
} catch (Exception e) {
log.error("从订单数据库获取订单信息失败", e);
throw new MongoCrossDatabaseQueryException("从订单数据库获取订单信息失败", e);
}
}

/**
* 根据订单ID从商品数据库获取商品信息
*/
private List<ProductInfo> getProductsByOrderIdFromProductDB(Long productId) {
try {
Query query = new Query(Criteria.where("_id").is(productId));
return productMongoTemplate.find(query, ProductInfo.class, "products");
} catch (Exception e) {
log.error("根据订单ID从商品数据库获取商品信息失败", e);
throw new MongoCrossDatabaseQueryException("根据订单ID从商品数据库获取商品信息失败", e);
}
}

/**
* 根据订单ID从支付数据库获取支付信息
*/
private List<PaymentInfo> getPaymentsByOrderIdFromPaymentDB(Long orderId) {
try {
Query query = new Query(Criteria.where("orderId").is(orderId));
return paymentMongoTemplate.find(query, PaymentInfo.class, "payments");
} catch (Exception e) {
log.error("根据订单ID从支付数据库获取支付信息失败", e);
throw new MongoCrossDatabaseQueryException("根据订单ID从支付数据库获取支付信息失败", e);
}
}

/**
* 跨库查询商品销售统计
*/
public ProductSalesStatistics crossDatabaseQueryProductSalesStatistics(Long productId, Date startTime, Date endTime) {
try {
// 从缓存获取
String cacheKey = CROSS_DB_CACHE_PREFIX + "product_sales:" + productId + ":" +
startTime.getTime() + ":" + endTime.getTime();
ProductSalesStatistics cachedStats = (ProductSalesStatistics) redisTemplate.opsForValue().get(cacheKey);

if (cachedStats != null) {
return cachedStats;
}

// 并行查询
CompletableFuture<ProductInfo> productFuture = CompletableFuture.supplyAsync(() -> {
return getProductInfoFromProductDB(productId);
});

CompletableFuture<List<OrderInfo>> ordersFuture = CompletableFuture.supplyAsync(() -> {
return getOrdersByProductIdFromOrderDB(productId, startTime, endTime);
});

CompletableFuture<List<PaymentInfo>> paymentsFuture = CompletableFuture.supplyAsync(() -> {
return getPaymentsByProductIdFromPaymentDB(productId, startTime, endTime);
});

// 等待完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
productFuture, ordersFuture, paymentsFuture);
allFutures.get();

// 聚合数据
ProductSalesStatistics statistics = new ProductSalesStatistics();
statistics.setProductInfo(productFuture.get());
statistics.setTotalOrders(ordersFuture.get().size());
statistics.setTotalSales(ordersFuture.get().stream()
.mapToDouble(OrderInfo::getAmount)
.sum());
statistics.setTotalPayments(paymentsFuture.get().size());
statistics.setTotalPaymentAmount(paymentsFuture.get().stream()
.mapToDouble(PaymentInfo::getAmount)
.sum());

// 计算趋势
calculateSalesTrend(statistics, ordersFuture.get());

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, statistics,
Duration.ofSeconds(CROSS_DB_CACHE_EXPIRE));

return statistics;

} catch (Exception e) {
log.error("跨库查询商品销售统计失败", e);
throw new MongoCrossDatabaseQueryException("跨库查询商品销售统计失败", e);
}
}

/**
* 从商品数据库获取商品信息
*/
private ProductInfo getProductInfoFromProductDB(Long productId) {
try {
Query query = new Query(Criteria.where("_id").is(productId));
return productMongoTemplate.findOne(query, ProductInfo.class, "products");
} catch (Exception e) {
log.error("从商品数据库获取商品信息失败", e);
throw new MongoCrossDatabaseQueryException("从商品数据库获取商品信息失败", e);
}
}

/**
* 根据商品ID从订单数据库获取订单信息
*/
private List<OrderInfo> getOrdersByProductIdFromOrderDB(Long productId, Date startTime, Date endTime) {
try {
Query query = new Query(Criteria.where("productId").is(productId)
.and("createTime").gte(startTime).lte(endTime));
return orderMongoTemplate.find(query, OrderInfo.class, "orders");
} catch (Exception e) {
log.error("根据商品ID从订单数据库获取订单信息失败", e);
throw new MongoCrossDatabaseQueryException("根据商品ID从订单数据库获取订单信息失败", e);
}
}

/**
* 根据商品ID从支付数据库获取支付信息
*/
private List<PaymentInfo> getPaymentsByProductIdFromPaymentDB(Long productId, Date startTime, Date endTime) {
try {
Query query = new Query(Criteria.where("productId").is(productId)
.and("createTime").gte(startTime).lte(endTime));
return paymentMongoTemplate.find(query, PaymentInfo.class, "payments");
} catch (Exception e) {
log.error("根据商品ID从支付数据库获取支付信息失败", e);
throw new MongoCrossDatabaseQueryException("根据商品ID从支付数据库获取支付信息失败", e);
}
}

/**
* 计算销售趋势
*/
private void calculateSalesTrend(ProductSalesStatistics statistics, List<OrderInfo> orders) {
try {
// 按日期分组统计
Map<String, Double> dailySales = orders.stream()
.collect(Collectors.groupingBy(order ->
order.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().toString(),
Collectors.summingDouble(OrderInfo::getAmount)));

statistics.setDailySalesTrend(dailySales);

// 按周分组统计
Map<String, Double> weeklySales = orders.stream()
.collect(Collectors.groupingBy(order -> {
LocalDate date = order.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
return date.getYear() + "-W" + date.get(WeekFields.ISO.weekOfYear());
}, Collectors.summingDouble(OrderInfo::getAmount)));

statistics.setWeeklySalesTrend(weeklySales);

// 按月分组统计
Map<String, Double> monthlySales = orders.stream()
.collect(Collectors.groupingBy(order -> {
LocalDate date = order.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
return date.getYear() + "-" + String.format("%02d", date.getMonthValue());
}, Collectors.summingDouble(OrderInfo::getAmount)));

statistics.setMonthlySalesTrend(monthlySales);

} catch (Exception e) {
log.error("计算销售趋势失败", e);
}
}
}

2.3 MongoDB事件驱动策略

2.3.1 MongoDB事件驱动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/**
* MongoDB事件驱动服务
*/
@Service
public class MongoEventDrivenService {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private MongoAggregationService mongoAggregationService;

@Autowired
private MongoCrossDatabaseQueryService crossDatabaseQueryService;

private final String MONGO_EVENT_QUEUE_PREFIX = "mongo_cross_db_query.event.";
private final String MONGO_EVENT_CACHE_PREFIX = "mongo_event_cache:";

/**
* 发布MongoDB跨库查询事件
*/
public void publishMongoCrossDatabaseQueryEvent(MongoCrossDatabaseQueryEvent event) {
try {
// 设置事件ID
if (event.getEventId() == null) {
event.setEventId(UUID.randomUUID().toString());
}

// 设置时间戳
event.setTimestamp(new Date());

// 发布到消息队列
String queueName = MONGO_EVENT_QUEUE_PREFIX + event.getEventType();
rabbitTemplate.convertAndSend(queueName, event);

log.info("发布MongoDB跨库查询事件成功: {}", event.getEventId());

} catch (Exception e) {
log.error("发布MongoDB跨库查询事件失败", e);
throw new MongoCrossDatabaseQueryException("发布MongoDB跨库查询事件失败", e);
}
}

/**
* 处理用户信息更新事件
*/
@RabbitListener(queues = "mongo_cross_db_query.event.user.updated")
public void handleUserUpdatedEvent(MongoUserUpdatedEvent event) {
try {
log.info("处理MongoDB用户信息更新事件: {}", event.getUserId());

// 清除相关缓存
clearUserRelatedCache(event.getUserId());

// 更新聚合数据
updateUserAggregatedData(event.getUserId());

// 发布数据更新事件
MongoDataUpdatedEvent dataUpdatedEvent = new MongoDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("MONGO_USER_DATA_UPDATED");
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishMongoCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理MongoDB用户信息更新事件失败", e);
}
}

/**
* 处理订单创建事件
*/
@RabbitListener(queues = "mongo_cross_db_query.event.order.created")
public void handleOrderCreatedEvent(MongoOrderCreatedEvent event) {
try {
log.info("处理MongoDB订单创建事件: {}", event.getOrderId());

// 清除相关缓存
clearOrderRelatedCache(event.getOrderId());

// 更新用户聚合数据
updateUserAggregatedData(event.getUserId());

// 更新商品聚合数据
updateProductAggregatedData(event.getProductId());

// 发布数据更新事件
MongoDataUpdatedEvent dataUpdatedEvent = new MongoDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("MONGO_ORDER_DATA_UPDATED");
dataUpdatedEvent.setOrderId(event.getOrderId());
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishMongoCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理MongoDB订单创建事件失败", e);
}
}

/**
* 处理支付完成事件
*/
@RabbitListener(queues = "mongo_cross_db_query.event.payment.completed")
public void handlePaymentCompletedEvent(MongoPaymentCompletedEvent event) {
try {
log.info("处理MongoDB支付完成事件: {}", event.getPaymentId());

// 清除相关缓存
clearPaymentRelatedCache(event.getPaymentId());

// 更新订单聚合数据
updateOrderAggregatedData(event.getOrderId());

// 更新用户聚合数据
updateUserAggregatedData(event.getUserId());

// 发布数据更新事件
MongoDataUpdatedEvent dataUpdatedEvent = new MongoDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("MONGO_PAYMENT_DATA_UPDATED");
dataUpdatedEvent.setPaymentId(event.getPaymentId());
dataUpdatedEvent.setOrderId(event.getOrderId());
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishMongoCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理MongoDB支付完成事件失败", e);
}
}

/**
* 清除用户相关缓存
*/
private void clearUserRelatedCache(Long userId) {
try {
String pattern = MONGO_EVENT_CACHE_PREFIX + "user_*:" + userId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除MongoDB用户相关缓存成功: {}", userId);

} catch (Exception e) {
log.error("清除MongoDB用户相关缓存失败", e);
}
}

/**
* 清除订单相关缓存
*/
private void clearOrderRelatedCache(Long orderId) {
try {
String pattern = MONGO_EVENT_CACHE_PREFIX + "order_*:" + orderId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除MongoDB订单相关缓存成功: {}", orderId);

} catch (Exception e) {
log.error("清除MongoDB订单相关缓存失败", e);
}
}

/**
* 清除支付相关缓存
*/
private void clearPaymentRelatedCache(Long paymentId) {
try {
String pattern = MONGO_EVENT_CACHE_PREFIX + "payment_*:" + paymentId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除MongoDB支付相关缓存成功: {}", paymentId);

} catch (Exception e) {
log.error("清除MongoDB支付相关缓存失败", e);
}
}

/**
* 更新用户聚合数据
*/
private void updateUserAggregatedData(Long userId) {
try {
// 异步更新用户聚合数据
CompletableFuture.runAsync(() -> {
try {
mongoAggregationService.aggregateUserOrderInfo(userId);
log.info("更新MongoDB用户聚合数据成功: {}", userId);
} catch (Exception e) {
log.error("更新MongoDB用户聚合数据失败: {}", userId, e);
}
});

} catch (Exception e) {
log.error("更新MongoDB用户聚合数据失败", e);
}
}

/**
* 更新订单聚合数据
*/
private void updateOrderAggregatedData(Long orderId) {
try {
// 异步更新订单聚合数据
CompletableFuture.runAsync(() -> {
try {
crossDatabaseQueryService.crossDatabaseQueryOrderDetailInfo(orderId);
log.info("更新MongoDB订单聚合数据成功: {}", orderId);
} catch (Exception e) {
log.error("更新MongoDB订单聚合数据失败: {}", orderId, e);
}
});

} catch (Exception e) {
log.error("更新MongoDB订单聚合数据失败", e);
}
}

/**
* 更新商品聚合数据
*/
private void updateProductAggregatedData(Long productId) {
try {
// 异步更新商品聚合数据
CompletableFuture.runAsync(() -> {
try {
Date endTime = new Date();
Date startTime = new Date(endTime.getTime() - 30L * 24 * 60 * 60 * 1000); // 30天前
crossDatabaseQueryService.crossDatabaseQueryProductSalesStatistics(productId, startTime, endTime);
log.info("更新MongoDB商品聚合数据成功: {}", productId);
} catch (Exception e) {
log.error("更新MongoDB商品聚合数据失败: {}", productId, e);
}
});

} catch (Exception e) {
log.error("更新MongoDB商品聚合数据失败", e);
}
}
}

三、企业级MongoDB微服务数据查询方案

3.1 MongoDB微服务数据查询管理服务

3.1.1 MongoDB微服务数据查询管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
/**
* MongoDB微服务数据查询管理服务
*/
@Service
public class MongoMicroserviceDataQueryManagementService {

@Autowired
private MongoAggregationService mongoAggregationService;

@Autowired
private MongoCrossDatabaseQueryService crossDatabaseQueryService;

@Autowired
private MongoEventDrivenService mongoEventDrivenService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String MONGO_QUERY_CACHE_PREFIX = "mongo_query_cache:";
private final long MONGO_QUERY_CACHE_EXPIRE = 1800; // 30分钟

/**
* 执行MongoDB跨库查询
*/
public MongoCrossDatabaseQueryResult executeMongoCrossDatabaseQuery(MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 验证请求
validateMongoQueryRequest(request);

// 选择查询策略
MongoQueryStrategy strategy = selectMongoQueryStrategy(request);
result.setStrategy(strategy);

// 执行查询
switch (strategy) {
case AGGREGATION_PIPELINE:
result = executeAggregationPipelineQuery(request);
break;
case CROSS_DATABASE_QUERY:
result = executeCrossDatabaseQuery(request);
break;
case EVENT_DRIVEN:
result = executeEventDrivenQuery(request);
break;
case HYBRID:
result = executeHybridQuery(request);
break;
default:
throw new IllegalArgumentException("不支持的MongoDB查询策略: " + strategy);
}

result.setStatus(MongoQueryStatus.SUCCESS);
result.setEndTime(new Date());

log.info("执行MongoDB跨库查询成功: 请求ID={}, 策略={}", request.getRequestId(), strategy);
return result;

} catch (Exception e) {
log.error("执行MongoDB跨库查询失败", e);
throw new MongoCrossDatabaseQueryException("执行MongoDB跨库查询失败", e);
}
}

/**
* 验证MongoDB查询请求
*/
private void validateMongoQueryRequest(MongoCrossDatabaseQueryRequest request) {
if (request.getRequestId() == null || request.getRequestId().trim().isEmpty()) {
throw new IllegalArgumentException("请求ID不能为空");
}

if (request.getQueryType() == null) {
throw new IllegalArgumentException("查询类型不能为空");
}

if (request.getParameters() == null) {
request.setParameters(new HashMap<>());
}
}

/**
* 选择MongoDB查询策略
*/
private MongoQueryStrategy selectMongoQueryStrategy(MongoCrossDatabaseQueryRequest request) {
try {
// 根据查询类型选择策略
switch (request.getQueryType()) {
case USER_ORDER_AGGREGATION:
return MongoQueryStrategy.AGGREGATION_PIPELINE;
case USER_COMPLETE_INFO:
return MongoQueryStrategy.CROSS_DATABASE_QUERY;
case ORDER_DETAIL_INFO:
return MongoQueryStrategy.CROSS_DATABASE_QUERY;
case PRODUCT_SALES_STATISTICS:
return MongoQueryStrategy.CROSS_DATABASE_QUERY;
case COMPLEX_AGGREGATION:
return MongoQueryStrategy.AGGREGATION_PIPELINE;
case SEARCH_RESULTS:
return MongoQueryStrategy.HYBRID;
default:
return MongoQueryStrategy.CROSS_DATABASE_QUERY;
}

} catch (Exception e) {
log.error("选择MongoDB查询策略失败", e);
return MongoQueryStrategy.CROSS_DATABASE_QUERY; // 默认策略
}
}

/**
* 执行聚合管道查询
*/
private MongoCrossDatabaseQueryResult executeAggregationPipelineQuery(MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(MongoQueryStrategy.AGGREGATION_PIPELINE);

// 根据查询类型执行不同的聚合管道查询
switch (request.getQueryType()) {
case USER_ORDER_AGGREGATION:
Long userId = Long.valueOf(request.getParameters().get("userId").toString());
UserOrderAggregationResult userOrderAggregation = mongoAggregationService.aggregateUserOrderInfo(userId);
result.setData(userOrderAggregation);
break;

case COMPLEX_AGGREGATION:
ComplexAggregationRequest complexRequest = (ComplexAggregationRequest) request.getParameters().get("complexRequest");
ComplexAggregationResult complexResult = mongoAggregationService.executeComplexAggregation(complexRequest);
result.setData(complexResult);
break;

default:
throw new IllegalArgumentException("不支持的聚合管道查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行聚合管道查询失败", e);
throw new MongoCrossDatabaseQueryException("执行聚合管道查询失败", e);
}
}

/**
* 执行跨库查询
*/
private MongoCrossDatabaseQueryResult executeCrossDatabaseQuery(MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(MongoQueryStrategy.CROSS_DATABASE_QUERY);

// 根据查询类型执行不同的跨库查询
switch (request.getQueryType()) {
case USER_COMPLETE_INFO:
Long userId = Long.valueOf(request.getParameters().get("userId").toString());
UserCompleteInfo userCompleteInfo = crossDatabaseQueryService.crossDatabaseQueryUserCompleteInfo(userId);
result.setData(userCompleteInfo);
break;

case ORDER_DETAIL_INFO:
Long orderId = Long.valueOf(request.getParameters().get("orderId").toString());
OrderDetailInfo orderDetailInfo = crossDatabaseQueryService.crossDatabaseQueryOrderDetailInfo(orderId);
result.setData(orderDetailInfo);
break;

case PRODUCT_SALES_STATISTICS:
Long productId = Long.valueOf(request.getParameters().get("productId").toString());
Date startTime = (Date) request.getParameters().get("startTime");
Date endTime = (Date) request.getParameters().get("endTime");
ProductSalesStatistics statistics = crossDatabaseQueryService.crossDatabaseQueryProductSalesStatistics(
productId, startTime, endTime);
result.setData(statistics);
break;

default:
throw new IllegalArgumentException("不支持的跨库查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行跨库查询失败", e);
throw new MongoCrossDatabaseQueryException("执行跨库查询失败", e);
}
}

/**
* 执行事件驱动查询
*/
private MongoCrossDatabaseQueryResult executeEventDrivenQuery(MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(MongoQueryStrategy.EVENT_DRIVEN);

// 发布查询事件
MongoCrossDatabaseQueryEvent event = new MongoCrossDatabaseQueryEvent();
event.setEventId(UUID.randomUUID().toString());
event.setEventType("MONGO_CROSS_DATABASE_QUERY");
event.setRequestId(request.getRequestId());
event.setQueryType(request.getQueryType());
event.setParameters(request.getParameters());
event.setTimestamp(new Date());

mongoEventDrivenService.publishMongoCrossDatabaseQueryEvent(event);

// 等待事件处理完成(实际应用中可能需要更复杂的等待机制)
Thread.sleep(1000);

result.setData("MongoDB事件已发布,请稍后查询结果");

return result;

} catch (Exception e) {
log.error("执行事件驱动查询失败", e);
throw new MongoCrossDatabaseQueryException("执行事件驱动查询失败", e);
}
}

/**
* 执行混合查询
*/
private MongoCrossDatabaseQueryResult executeHybridQuery(MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(MongoQueryStrategy.HYBRID);

// 根据查询类型执行混合查询
switch (request.getQueryType()) {
case SEARCH_RESULTS:
MongoSearchRequest searchRequest = (MongoSearchRequest) request.getParameters().get("searchRequest");
MongoSearchResult searchResult = executeMongoSearch(searchRequest);
result.setData(searchResult);
break;

default:
throw new IllegalArgumentException("不支持的混合查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行混合查询失败", e);
throw new MongoCrossDatabaseQueryException("执行混合查询失败", e);
}
}

/**
* 执行MongoDB搜索
*/
private MongoSearchResult executeMongoSearch(MongoSearchRequest request) {
try {
MongoSearchResult result = new MongoSearchResult();

// 构建搜索查询
Query query = new Query();

if (request.getKeyword() != null && !request.getKeyword().trim().isEmpty()) {
query.addCriteria(Criteria.where("name").regex(request.getKeyword(), "i")
.orOperator(Criteria.where("description").regex(request.getKeyword(), "i")));
}

if (request.getCategory() != null) {
query.addCriteria(Criteria.where("category").is(request.getCategory()));
}

if (request.getPriceMin() != null) {
query.addCriteria(Criteria.where("price").gte(request.getPriceMin()));
}

if (request.getPriceMax() != null) {
query.addCriteria(Criteria.where("price").lte(request.getPriceMax()));
}

// 设置分页
query.with(Sort.by(Sort.Direction.DESC, "createTime"));
query.skip(request.getFrom());
query.limit(request.getSize());

// 执行搜索
List<ProductInfo> products = productMongoTemplate.find(query, ProductInfo.class, "products");

// 构建结果
result.setProducts(products);
result.setTotalCount(products.size());
result.setFrom(request.getFrom());
result.setSize(request.getSize());

return result;

} catch (Exception e) {
log.error("执行MongoDB搜索失败", e);
throw new MongoCrossDatabaseQueryException("执行MongoDB搜索失败", e);
}
}

/**
* 异步执行MongoDB跨库查询
*/
@Async
public CompletableFuture<MongoCrossDatabaseQueryResult> executeMongoCrossDatabaseQueryAsync(
MongoCrossDatabaseQueryRequest request) {
try {
MongoCrossDatabaseQueryResult result = executeMongoCrossDatabaseQuery(request);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

/**
* 获取MongoDB查询结果
*/
public MongoCrossDatabaseQueryResult getMongoQueryResult(String requestId) {
try {
// 从缓存获取
String cacheKey = MONGO_QUERY_CACHE_PREFIX + requestId;
MongoCrossDatabaseQueryResult cachedResult = (MongoCrossDatabaseQueryResult) redisTemplate.opsForValue().get(cacheKey);

if (cachedResult != null) {
return cachedResult;
}

// 从数据库获取(实际应用中需要存储查询结果)
throw new MongoCrossDatabaseQueryException("MongoDB查询结果不存在: " + requestId);

} catch (Exception e) {
log.error("获取MongoDB查询结果失败", e);
throw new MongoCrossDatabaseQueryException("获取MongoDB查询结果失败", e);
}
}

/**
* 缓存MongoDB查询结果
*/
public void cacheMongoQueryResult(String requestId, MongoCrossDatabaseQueryResult result) {
try {
String cacheKey = MONGO_QUERY_CACHE_PREFIX + requestId;
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofSeconds(MONGO_QUERY_CACHE_EXPIRE));

} catch (Exception e) {
log.error("缓存MongoDB查询结果失败", e);
}
}

/**
* 获取MongoDB查询统计信息
*/
public MongoQueryStatistics getMongoQueryStatistics(Date startTime, Date endTime) {
try {
MongoQueryStatistics statistics = new MongoQueryStatistics();
statistics.setStartTime(startTime);
statistics.setEndTime(endTime);

// 统计查询次数
statistics.setTotalQueries(1000); // 实际应用中需要从数据库统计

// 统计查询策略使用情况
Map<MongoQueryStrategy, Long> strategyCount = new HashMap<>();
strategyCount.put(MongoQueryStrategy.AGGREGATION_PIPELINE, 400L);
strategyCount.put(MongoQueryStrategy.CROSS_DATABASE_QUERY, 300L);
strategyCount.put(MongoQueryStrategy.EVENT_DRIVEN, 100L);
strategyCount.put(MongoQueryStrategy.HYBRID, 200L);
statistics.setStrategyCount(strategyCount);

// 统计查询类型使用情况
Map<MongoQueryType, Long> typeCount = new HashMap<>();
typeCount.put(MongoQueryType.USER_ORDER_AGGREGATION, 300L);
typeCount.put(MongoQueryType.USER_COMPLETE_INFO, 250L);
typeCount.put(MongoQueryType.ORDER_DETAIL_INFO, 200L);
typeCount.put(MongoQueryType.PRODUCT_SALES_STATISTICS, 150L);
typeCount.put(MongoQueryType.COMPLEX_AGGREGATION, 100L);
statistics.setTypeCount(typeCount);

// 统计平均响应时间
statistics.setAverageResponseTime(300.0); // 300ms

// 统计成功率
statistics.setSuccessRate(0.98); // 98%

return statistics;

} catch (Exception e) {
log.error("获取MongoDB查询统计信息失败", e);
throw new MongoCrossDatabaseQueryException("获取MongoDB查询统计信息失败", e);
}
}
}

3.2 MongoDB微服务数据查询优化服务

3.2.1 MongoDB微服务数据查询优化服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
/**
* MongoDB微服务数据查询优化服务
*/
@Service
public class MongoMicroserviceDataQueryOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

@Autowired
private MongoMicroserviceDataQueryManagementService queryManagementService;

private final String MONGO_OPTIMIZATION_CACHE_PREFIX = "mongo_optimization_cache:";

/**
* 优化MongoDB跨库查询性能
*/
public MongoQueryOptimizationResult optimizeMongoCrossDatabaseQueryPerformance(
MongoCrossDatabaseQueryRequest request) {
try {
MongoQueryOptimizationResult result = new MongoQueryOptimizationResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 1. 分析MongoDB查询模式
MongoQueryPatternAnalysis patternAnalysis = analyzeMongoQueryPattern(request);
result.setPatternAnalysis(patternAnalysis);

// 2. 优化MongoDB缓存策略
MongoCacheOptimizationResult cacheOptimization = optimizeMongoCacheStrategy(request, patternAnalysis);
result.setCacheOptimization(cacheOptimization);

// 3. 优化MongoDB查询策略
MongoQueryStrategyOptimizationResult strategyOptimization = optimizeMongoQueryStrategy(request, patternAnalysis);
result.setStrategyOptimization(strategyOptimization);

// 4. 优化MongoDB聚合策略
MongoAggregationOptimizationResult aggregationOptimization = optimizeMongoAggregationStrategy(request, patternAnalysis);
result.setAggregationOptimization(aggregationOptimization);

result.setStatus(MongoOptimizationStatus.COMPLETED);
result.setEndTime(new Date());

log.info("优化MongoDB跨库查询性能完成: 请求ID={}", request.getRequestId());
return result;

} catch (Exception e) {
log.error("优化MongoDB跨库查询性能失败", e);
throw new MongoCrossDatabaseQueryException("优化MongoDB跨库查询性能失败", e);
}
}

/**
* 分析MongoDB查询模式
*/
private MongoQueryPatternAnalysis analyzeMongoQueryPattern(MongoCrossDatabaseQueryRequest request) {
try {
MongoQueryPatternAnalysis analysis = new MongoQueryPatternAnalysis();
analysis.setRequestId(request.getRequestId());
analysis.setQueryType(request.getQueryType());

// 分析查询频率
analysis.setQueryFrequency(analyzeMongoQueryFrequency(request.getQueryType()));

// 分析查询复杂度
analysis.setQueryComplexity(analyzeMongoQueryComplexity(request));

// 分析数据量
analysis.setDataVolume(analyzeMongoDataVolume(request));

// 分析响应时间
analysis.setResponseTime(analyzeMongoResponseTime(request.getQueryType()));

return analysis;

} catch (Exception e) {
log.error("分析MongoDB查询模式失败", e);
throw new MongoCrossDatabaseQueryException("分析MongoDB查询模式失败", e);
}
}

/**
* 分析MongoDB查询频率
*/
private MongoQueryFrequency analyzeMongoQueryFrequency(MongoQueryType queryType) {
try {
// 从缓存获取查询频率统计
String cacheKey = MONGO_OPTIMIZATION_CACHE_PREFIX + "frequency:" + queryType.name();
MongoQueryFrequency cachedFrequency = (MongoQueryFrequency) redisTemplate.opsForValue().get(cacheKey);

if (cachedFrequency != null) {
return cachedFrequency;
}

// 计算查询频率(实际应用中需要从数据库统计)
MongoQueryFrequency frequency = new MongoQueryFrequency();
frequency.setQueryType(queryType);
frequency.setDailyCount(800);
frequency.setHourlyCount(80);
frequency.setMinuteCount(8);

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, frequency, Duration.ofHours(1));

return frequency;

} catch (Exception e) {
log.error("分析MongoDB查询频率失败", e);
return new MongoQueryFrequency();
}
}

/**
* 分析MongoDB查询复杂度
*/
private MongoQueryComplexity analyzeMongoQueryComplexity(MongoCrossDatabaseQueryRequest request) {
try {
MongoQueryComplexity complexity = new MongoQueryComplexity();

// 分析参数数量
complexity.setParameterCount(request.getParameters().size());

// 分析查询类型复杂度
switch (request.getQueryType()) {
case USER_ORDER_AGGREGATION:
complexity.setComplexityLevel(MongoComplexityLevel.MEDIUM);
complexity.setCollectionCount(3);
complexity.setAggregationStageCount(5);
break;
case USER_COMPLETE_INFO:
complexity.setComplexityLevel(MongoComplexityLevel.HIGH);
complexity.setCollectionCount(4);
complexity.setAggregationStageCount(0);
break;
case ORDER_DETAIL_INFO:
complexity.setComplexityLevel(MongoComplexityLevel.MEDIUM);
complexity.setCollectionCount(3);
complexity.setAggregationStageCount(0);
break;
case PRODUCT_SALES_STATISTICS:
complexity.setComplexityLevel(MongoComplexityLevel.HIGH);
complexity.setCollectionCount(3);
complexity.setAggregationStageCount(0);
break;
case COMPLEX_AGGREGATION:
complexity.setComplexityLevel(MongoComplexityLevel.VERY_HIGH);
complexity.setCollectionCount(5);
complexity.setAggregationStageCount(10);
break;
case SEARCH_RESULTS:
complexity.setComplexityLevel(MongoComplexityLevel.LOW);
complexity.setCollectionCount(1);
complexity.setAggregationStageCount(0);
break;
default:
complexity.setComplexityLevel(MongoComplexityLevel.MEDIUM);
complexity.setCollectionCount(2);
complexity.setAggregationStageCount(0);
break;
}

return complexity;

} catch (Exception e) {
log.error("分析MongoDB查询复杂度失败", e);
return new MongoQueryComplexity();
}
}

/**
* 分析MongoDB数据量
*/
private MongoDataVolume analyzeMongoDataVolume(MongoCrossDatabaseQueryRequest request) {
try {
MongoDataVolume volume = new MongoDataVolume();

// 根据查询类型估算数据量
switch (request.getQueryType()) {
case USER_ORDER_AGGREGATION:
volume.setEstimatedDocuments(50);
volume.setDataSizeKB(25);
break;
case USER_COMPLETE_INFO:
volume.setEstimatedDocuments(200);
volume.setDataSizeKB(100);
break;
case ORDER_DETAIL_INFO:
volume.setEstimatedDocuments(100);
volume.setDataSizeKB(50);
break;
case PRODUCT_SALES_STATISTICS:
volume.setEstimatedDocuments(500);
volume.setDataSizeKB(250);
break;
case COMPLEX_AGGREGATION:
volume.setEstimatedDocuments(1000);
volume.setDataSizeKB(500);
break;
case SEARCH_RESULTS:
volume.setEstimatedDocuments(20);
volume.setDataSizeKB(10);
break;
default:
volume.setEstimatedDocuments(100);
volume.setDataSizeKB(50);
break;
}

return volume;

} catch (Exception e) {
log.error("分析MongoDB数据量失败", e);
return new MongoDataVolume();
}
}

/**
* 分析MongoDB响应时间
*/
private MongoResponseTime analyzeMongoResponseTime(MongoQueryType queryType) {
try {
MongoResponseTime responseTime = new MongoResponseTime();

// 根据查询类型估算响应时间
switch (queryType) {
case USER_ORDER_AGGREGATION:
responseTime.setAverageTime(200);
responseTime.setP95Time(400);
responseTime.setP99Time(600);
break;
case USER_COMPLETE_INFO:
responseTime.setAverageTime(300);
responseTime.setP95Time(600);
responseTime.setP99Time(1000);
break;
case ORDER_DETAIL_INFO:
responseTime.setAverageTime(250);
responseTime.setP95Time(500);
responseTime.setP99Time(800);
break;
case PRODUCT_SALES_STATISTICS:
responseTime.setAverageTime(400);
responseTime.setP95Time(800);
responseTime.setP99Time(1200);
break;
case COMPLEX_AGGREGATION:
responseTime.setAverageTime(500);
responseTime.setP95Time(1000);
responseTime.setP99Time(1500);
break;
case SEARCH_RESULTS:
responseTime.setAverageTime(100);
responseTime.setP95Time(200);
responseTime.setP99Time(300);
break;
default:
responseTime.setAverageTime(250);
responseTime.setP95Time(500);
responseTime.setP99Time(800);
break;
}

return responseTime;

} catch (Exception e) {
log.error("分析MongoDB响应时间失败", e);
return new MongoResponseTime();
}
}

/**
* 优化MongoDB缓存策略
*/
private MongoCacheOptimizationResult optimizeMongoCacheStrategy(MongoCrossDatabaseQueryRequest request,
MongoQueryPatternAnalysis analysis) {
try {
MongoCacheOptimizationResult result = new MongoCacheOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据查询模式优化缓存策略
if (analysis.getQueryFrequency().getDailyCount() > 800) {
result.setRecommendedCacheExpire(3600); // 1小时
result.setRecommendedCacheSize(1000);
result.setRecommendedCacheStrategy("LRU");
} else if (analysis.getQueryFrequency().getDailyCount() > 100) {
result.setRecommendedCacheExpire(1800); // 30分钟
result.setRecommendedCacheSize(500);
result.setRecommendedCacheStrategy("LFU");
} else {
result.setRecommendedCacheExpire(600); // 10分钟
result.setRecommendedCacheSize(100);
result.setRecommendedCacheStrategy("FIFO");
}

// 根据数据量优化缓存策略
if (analysis.getDataVolume().getDataSizeKB() > 100) {
result.setRecommendedCompression(true);
result.setRecommendedCompressionLevel(6);
} else {
result.setRecommendedCompression(false);
}

return result;

} catch (Exception e) {
log.error("优化MongoDB缓存策略失败", e);
throw new MongoCrossDatabaseQueryException("优化MongoDB缓存策略失败", e);
}
}

/**
* 优化MongoDB查询策略
*/
private MongoQueryStrategyOptimizationResult optimizeMongoQueryStrategy(MongoCrossDatabaseQueryRequest request,
MongoQueryPatternAnalysis analysis) {
try {
MongoQueryStrategyOptimizationResult result = new MongoQueryStrategyOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据查询复杂度优化策略
if (analysis.getQueryComplexity().getComplexityLevel() == MongoComplexityLevel.VERY_HIGH) {
result.setRecommendedStrategy(MongoQueryStrategy.AGGREGATION_PIPELINE);
result.setRecommendedParallelism(4);
result.setRecommendedTimeout(5000);
} else if (analysis.getQueryComplexity().getComplexityLevel() == MongoComplexityLevel.HIGH) {
result.setRecommendedStrategy(MongoQueryStrategy.CROSS_DATABASE_QUERY);
result.setRecommendedParallelism(3);
result.setRecommendedTimeout(3000);
} else if (analysis.getQueryComplexity().getComplexityLevel() == MongoComplexityLevel.MEDIUM) {
result.setRecommendedStrategy(MongoQueryStrategy.AGGREGATION_PIPELINE);
result.setRecommendedParallelism(2);
result.setRecommendedTimeout(2000);
} else {
result.setRecommendedStrategy(MongoQueryStrategy.CROSS_DATABASE_QUERY);
result.setRecommendedParallelism(1);
result.setRecommendedTimeout(1000);
}

// 根据响应时间优化策略
if (analysis.getResponseTime().getP95Time() > 500) {
result.setRecommendedAsync(true);
result.setRecommendedBatchSize(10);
} else {
result.setRecommendedAsync(false);
result.setRecommendedBatchSize(50);
}

return result;

} catch (Exception e) {
log.error("优化MongoDB查询策略失败", e);
throw new MongoCrossDatabaseQueryException("优化MongoDB查询策略失败", e);
}
}

/**
* 优化MongoDB聚合策略
*/
private MongoAggregationOptimizationResult optimizeMongoAggregationStrategy(MongoCrossDatabaseQueryRequest request,
MongoQueryPatternAnalysis analysis) {
try {
MongoAggregationOptimizationResult result = new MongoAggregationOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据数据量优化聚合策略
if (analysis.getDataVolume().getEstimatedDocuments() > 1000) {
result.setRecommendedAggregationType(MongoAggregationType.BATCH);
result.setRecommendedBatchSize(100);
result.setRecommendedParallelism(4);
} else if (analysis.getDataVolume().getEstimatedDocuments() > 100) {
result.setRecommendedAggregationType(MongoAggregationType.STREAM);
result.setRecommendedBatchSize(50);
result.setRecommendedParallelism(2);
} else {
result.setRecommendedAggregationType(MongoAggregationType.MEMORY);
result.setRecommendedBatchSize(20);
result.setRecommendedParallelism(1);
}

// 根据查询频率优化聚合策略
if (analysis.getQueryFrequency().getDailyCount() > 800) {
result.setRecommendedPreAggregation(true);
result.setRecommendedPreAggregationInterval(300); // 5分钟
} else {
result.setRecommendedPreAggregation(false);
}

return result;

} catch (Exception e) {
log.error("优化MongoDB聚合策略失败", e);
throw new MongoCrossDatabaseQueryException("优化MongoDB聚合策略失败", e);
}
}

/**
* 预热MongoDB查询缓存
*/
@PostConstruct
public void warmupMongoQueryCache() {
try {
// 预热常用MongoDB查询
List<MongoQueryType> commonQueryTypes = Arrays.asList(
MongoQueryType.USER_ORDER_AGGREGATION,
MongoQueryType.ORDER_DETAIL_INFO,
MongoQueryType.SEARCH_RESULTS);

for (MongoQueryType queryType : commonQueryTypes) {
try {
String cacheKey = MONGO_OPTIMIZATION_CACHE_PREFIX + "warmup:" + queryType.name();
Object warmupData = new Object();
redisTemplate.opsForValue().set(cacheKey, warmupData, Duration.ofHours(1));
} catch (Exception e) {
log.error("预热MongoDB查询缓存失败: {}", queryType, e);
}
}

} catch (Exception e) {
log.error("预热MongoDB查询缓存失败", e);
}
}

/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanupExpiredCache() {
try {
// 清理本地缓存
localCache.cleanUp();

// 清理Redis过期缓存
cleanupRedisExpiredCache();

} catch (Exception e) {
log.error("清理过期缓存失败", e);
}
}

/**
* 清理Redis过期缓存
*/
private void cleanupRedisExpiredCache() {
try {
Set<String> cacheKeys = redisTemplate.keys(MONGO_OPTIMIZATION_CACHE_PREFIX + "*");

for (String key : cacheKeys) {
Long ttl = redisTemplate.getExpire(key);
if (ttl != null && ttl <= 0) {
redisTemplate.delete(key);
}
}

} catch (Exception e) {
log.error("清理Redis过期缓存失败", e);
}
}
}

四、性能优化与监控

4.1 性能优化

4.1.1 MongoDB跨库查询性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/**
* MongoDB跨库查询性能优化服务
*/
@Service
public class MongoCrossDatabaseQueryPerformanceOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

private final String MONGO_PERFORMANCE_CACHE_PREFIX = "mongo_performance_cache:";

/**
* 缓存MongoDB查询性能数据
*/
public void cacheMongoQueryPerformanceData(String requestId, Object data) {
String cacheKey = MONGO_PERFORMANCE_CACHE_PREFIX + requestId;

try {
// 写入本地缓存
localCache.put(cacheKey, data);

// 写入Redis缓存
String redisCacheKey = "redis_cache:" + cacheKey;
redisTemplate.opsForValue().set(redisCacheKey, data, Duration.ofHours(1));

} catch (Exception e) {
log.error("缓存MongoDB查询性能数据失败", e);
}
}

/**
* 获取缓存的MongoDB查询性能数据
*/
public Object getCachedMongoQueryPerformanceData(String requestId) {
String cacheKey = MONGO_PERFORMANCE_CACHE_PREFIX + requestId;

try {
// 从本地缓存获取
Object cachedData = localCache.getIfPresent(cacheKey);
if (cachedData != null) {
return cachedData;
}

// 从Redis获取
String redisCacheKey = "redis_cache:" + cacheKey;
Object redisData = redisTemplate.opsForValue().get(redisCacheKey);
if (redisData != null) {
localCache.put(cacheKey, redisData);
return redisData;
}

} catch (Exception e) {
log.error("获取缓存的MongoDB查询性能数据失败", e);
}

return null;
}

/**
* 批量处理MongoDB查询请求
*/
public List<MongoCrossDatabaseQueryResult> batchProcessMongoQueryRequests(
List<MongoCrossDatabaseQueryRequest> requests) {
List<MongoCrossDatabaseQueryResult> results = new ArrayList<>();

try {
// 按查询类型分组
Map<MongoQueryType, List<MongoCrossDatabaseQueryRequest>> typeGroups = requests.stream()
.collect(Collectors.groupingBy(MongoCrossDatabaseQueryRequest::getQueryType));

// 并行处理各类型
typeGroups.entrySet().parallelStream().forEach(entry -> {
MongoQueryType queryType = entry.getKey();
List<MongoCrossDatabaseQueryRequest> typeRequests = entry.getValue();

try {
List<MongoCrossDatabaseQueryResult> typeResults = processMongoTypeRequests(queryType, typeRequests);

synchronized (results) {
results.addAll(typeResults);
}

} catch (Exception e) {
log.error("处理MongoDB查询类型失败: {}", queryType, e);
}
});

} catch (Exception e) {
log.error("批量处理MongoDB查询请求失败", e);
}

return results;
}

/**
* 处理MongoDB类型请求
*/
private List<MongoCrossDatabaseQueryResult> processMongoTypeRequests(MongoQueryType queryType,
List<MongoCrossDatabaseQueryRequest> requests) {
List<MongoCrossDatabaseQueryResult> results = new ArrayList<>();

for (MongoCrossDatabaseQueryRequest request : requests) {
try {
MongoCrossDatabaseQueryResult result = processMongoQueryRequest(request);
results.add(result);
} catch (Exception e) {
log.error("处理MongoDB查询请求失败: {}", request.getRequestId(), e);
MongoCrossDatabaseQueryResult errorResult = new MongoCrossDatabaseQueryResult();
errorResult.setRequestId(request.getRequestId());
errorResult.setStatus(MongoQueryStatus.FAILED);
errorResult.setErrorMessage(e.getMessage());
results.add(errorResult);
}
}

return results;
}

/**
* 处理MongoDB查询请求
*/
private MongoCrossDatabaseQueryResult processMongoQueryRequest(MongoCrossDatabaseQueryRequest request) {
// 实现MongoDB查询处理逻辑
MongoCrossDatabaseQueryResult result = new MongoCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStatus(MongoQueryStatus.SUCCESS);
result.setData("MongoDB查询成功");
return result;
}

/**
* 预热MongoDB查询性能缓存
*/
@PostConstruct
public void warmupMongoQueryPerformanceCache() {
try {
// 预热常用MongoDB查询性能数据
List<String> commonRequestIds = Arrays.asList("mongo_req_1", "mongo_req_2", "mongo_req_3");

for (String requestId : commonRequestIds) {
try {
String cacheKey = MONGO_PERFORMANCE_CACHE_PREFIX + requestId;
Object performanceData = new Object();
cacheMongoQueryPerformanceData(requestId, performanceData);
} catch (Exception e) {
log.error("预热MongoDB查询性能缓存失败: {}", requestId, e);
}
}

} catch (Exception e) {
log.error("预热MongoDB查询性能缓存失败", e);
}
}

/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanupExpiredCache() {
try {
// 清理本地缓存
localCache.cleanUp();

// 清理Redis过期缓存
cleanupRedisExpiredCache();

} catch (Exception e) {
log.error("清理过期缓存失败", e);
}
}

/**
* 清理Redis过期缓存
*/
private void cleanupRedisExpiredCache() {
try {
Set<String> cacheKeys = redisTemplate.keys("redis_cache:" + MONGO_PERFORMANCE_CACHE_PREFIX + "*");

for (String key : cacheKeys) {
Long ttl = redisTemplate.getExpire(key);
if (ttl != null && ttl <= 0) {
redisTemplate.delete(key);
}
}

} catch (Exception e) {
log.error("清理Redis过期缓存失败", e);
}
}
}

4.2 监控告警

4.2.1 MongoDB监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* MongoDB跨库查询监控指标
*/
@Component
public class MongoCrossDatabaseQueryMetrics {

private final MeterRegistry meterRegistry;

public MongoCrossDatabaseQueryMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 记录MongoDB跨库查询次数
*/
public void recordMongoCrossDatabaseQueryCount(String queryType, String strategy) {
Counter.builder("mongo_cross_db_query.count")
.description("MongoDB跨库查询次数")
.tag("query_type", queryType)
.tag("strategy", strategy)
.register(meterRegistry)
.increment();
}

/**
* 记录MongoDB跨库查询时间
*/
public void recordMongoCrossDatabaseQueryTime(String queryType, String strategy, long duration) {
Timer.builder("mongo_cross_db_query.time")
.description("MongoDB跨库查询时间")
.tag("query_type", queryType)
.tag("strategy", strategy)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 记录MongoDB跨库查询数据量
*/
public void recordMongoCrossDatabaseQueryDataCount(String queryType, long count) {
Counter.builder("mongo_cross_db_query.data.count")
.description("MongoDB跨库查询数据量")
.tag("query_type", queryType)
.register(meterRegistry)
.increment(count);
}

/**
* 记录MongoDB跨库查询成功率
*/
public void recordMongoCrossDatabaseQuerySuccessRate(String queryType, double successRate) {
Gauge.builder("mongo_cross_db_query.success.rate")
.description("MongoDB跨库查询成功率")
.tag("query_type", queryType)
.register(meterRegistry, successRate);
}

/**
* 记录MongoDB跨库查询失败率
*/
public void recordMongoCrossDatabaseQueryFailureRate(String queryType, double failureRate) {
Gauge.builder("mongo_cross_db_query.failure.rate")
.description("MongoDB跨库查询失败率")
.tag("query_type", queryType)
.register(meterRegistry, failureRate);
}

/**
* 记录MongoDB跨库查询吞吐量
*/
public void recordMongoCrossDatabaseQueryThroughput(String queryType, double throughput) {
Gauge.builder("mongo_cross_db_query.throughput")
.description("MongoDB跨库查询吞吐量")
.tag("query_type", queryType)
.register(meterRegistry, throughput);
}

/**
* 记录MongoDB跨库查询异常次数
*/
public void recordMongoCrossDatabaseQueryExceptionCount(String queryType, String exceptionType) {
Counter.builder("mongo_cross_db_query.exception.count")
.description("MongoDB跨库查询异常次数")
.tag("query_type", queryType)
.tag("exception_type", exceptionType)
.register(meterRegistry)
.increment();
}
}

4.2.2 MongoDB告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# prometheus-rules.yml
groups:
- name: mongo_cross_db_query_alerts
rules:
- alert: HighMongoCrossDatabaseQueryTime
expr: mongo_cross_db_query_time{quantile="0.95"} > 3000
for: 2m
labels:
severity: warning
annotations:
summary: "MongoDB跨库查询时间过长"
description: "MongoDB跨库查询时间P95超过3秒,当前值: {{ $value }}ms"

- alert: HighMongoCrossDatabaseQueryFailureRate
expr: mongo_cross_db_query_failure_rate > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "MongoDB跨库查询失败率过高"
description: "MongoDB跨库查询失败率超过5%,当前值: {{ $value }}"

- alert: LowMongoCrossDatabaseQueryThroughput
expr: mongo_cross_db_query_throughput < 20
for: 5m
labels:
severity: warning
annotations:
summary: "MongoDB跨库查询吞吐量过低"
description: "MongoDB跨库查询吞吐量低于20次/秒,当前值: {{ $value }}"

- alert: HighMongoCrossDatabaseQueryExceptionCount
expr: rate(mongo_cross_db_query_exception_count[5m]) > 3
for: 2m
labels:
severity: critical
annotations:
summary: "MongoDB跨库查询异常次数过多"
description: "MongoDB跨库查询异常频率超过3次/分钟,当前值: {{ $value }}"

- alert: MongoCrossDatabaseQueryServiceDown
expr: up{job="mongo-cross-db-query-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "MongoDB跨库查询服务宕机"
description: "MongoDB跨库查询服务已宕机超过1分钟"

五、总结

MongoDB微服务跨库查询作为NoSQL分布式系统中的重要挑战,通过合理的MongoDB跨库查询策略和数据聚合方案,能够在不破坏微服务边界的前提下实现复杂的数据查询需求。本文从MongoDB跨库查询策略到数据聚合方案,从基础实现到企业级应用,系统梳理了MongoDB微服务跨库查询的完整解决方案。

5.1 关键要点

  1. MongoDB聚合管道:通过聚合管道实现复杂的数据聚合和统计
  2. 跨库查询:通过并行查询多个MongoDB数据库实现数据整合
  3. 事件驱动:通过事件机制实现数据的异步更新和同步
  4. 性能优化:通过缓存、索引优化等手段优化查询性能
  5. 监控告警:建立完善的监控体系,及时发现和处理问题

5.2 最佳实践

  1. 策略选择:根据查询复杂度、数据量、响应时间要求选择合适的MongoDB查询策略
  2. 聚合优化:合理使用MongoDB聚合管道,提高查询效率
  3. 缓存策略:合理使用缓存,减少数据库查询压力
  4. 事件驱动:通过事件机制保持数据一致性
  5. 监控告警:建立完善的监控体系,确保MongoDB查询服务稳定运行

通过以上措施,可以构建一个高效、稳定、可扩展的MongoDB微服务跨库查询系统,为企业的各种业务场景提供数据查询支持。