1. 数据库写入限流概述

数据库写入限流是保护数据库系统稳定性的重要手段,通过限制每秒允许写入数据库的请求数量,可以有效防止数据库过载、连接池耗尽、锁竞争等问题。在Java应用中,合理使用数据库写入限流可以实现数据库保护、性能优化、系统稳定性提升等功能。本文将详细介绍数据库写入限流的原理、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 数据库写入限流核心价值

  1. 数据库保护: 防止数据库过载和连接池耗尽
  2. 性能优化: 通过限流提升数据库性能
  3. 系统稳定性: 保证系统稳定运行
  4. 资源管理: 合理管理数据库资源
  5. 监控告警: 提供丰富的监控指标

1.2 数据库写入限流场景

  • 高并发写入: 大量并发写入请求
  • 批量数据导入: 批量数据导入操作
  • 实时数据同步: 实时数据同步写入
  • 数据迁移: 数据迁移过程中的写入
  • 系统保护: 防止系统过载

1.3 限流算法对比

  • 令牌桶算法: 支持突发流量,平滑限流
  • 漏桶算法: 固定速率输出,严格限流
  • 滑动窗口: 基于时间窗口的限流
  • 计数器算法: 简单计数限流

2. 数据库写入限流基础实现

2.1 数据库写入限流配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 数据库写入限流配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(DatabaseWriteLimitProperties.class)
public class DatabaseWriteLimitConfig {

@Autowired
private DatabaseWriteLimitProperties properties;

/**
* 数据库写入限流器
* @return 限流器
*/
@Bean("databaseWriteLimiter")
public RateLimiter databaseWriteLimiter() {
RateLimiter limiter = RateLimiter.create(properties.getQpsLimit());
logger.info("数据库写入限流器初始化完成,QPS限制: {}", properties.getQpsLimit());
return limiter;
}

/**
* 数据库写入限流服务
* @return 限流服务
*/
@Bean
public DatabaseWriteLimitService databaseWriteLimitService() {
return new DatabaseWriteLimitService();
}

/**
* 数据库写入监控服务
* @return 监控服务
*/
@Bean
public DatabaseWriteMonitorService databaseWriteMonitorService() {
return new DatabaseWriteMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitConfig.class);
}

2.2 数据库写入限流属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 数据库写入限流属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "database.write.limit")
public class DatabaseWriteLimitProperties {

/**
* QPS限制
*/
private double qpsLimit = 100.0;

/**
* 是否启用限流
*/
private boolean enabled = true;

/**
* 限流模式
*/
private String mode = "TOKEN_BUCKET"; // TOKEN_BUCKET, LEAKY_BUCKET, SLIDING_WINDOW

/**
* 突发流量支持
*/
private boolean burstEnabled = true;

/**
* 突发流量大小
*/
private int burstSize = 50;

/**
* 预热时间(秒)
*/
private long warmupPeriod = 10;

/**
* 是否启用预热
*/
private boolean warmupEnabled = false;

/**
* 超时时间(毫秒)
*/
private long timeoutMs = 1000;

/**
* 是否启用超时
*/
private boolean timeoutEnabled = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用监控
*/
private boolean monitorEnabled = true;
}

2.3 基础数据库写入限流服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
* 基础数据库写入限流服务
* @author 运维实战
*/
@Service
public class DatabaseWriteLimitService {

@Autowired
@Qualifier("databaseWriteLimiter")
private RateLimiter rateLimiter;

@Autowired
private DatabaseWriteLimitProperties properties;

@Autowired
private DatabaseWriteMonitorService monitorService;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitService.class);

/**
* 限流写入单条记录
* @param data 数据
* @return 写入结果
*/
public DatabaseWriteResult writeWithLimit(Map<String, Object> data) {
if (!properties.isEnabled()) {
return executeWrite(data);
}

try {
// 尝试获取令牌
boolean acquired = acquireToken();

if (acquired) {
// 执行写入
DatabaseWriteResult result = executeWrite(data);

// 记录成功指标
monitorService.recordSuccess();

logger.debug("数据库写入成功,数据: {}", data);
return result;
} else {
// 记录限流指标
monitorService.recordLimited();

logger.warn("数据库写入被限流,数据: {}", data);
return DatabaseWriteResult.limited("写入请求被限流");
}

} catch (Exception e) {
// 记录异常指标
monitorService.recordError();

logger.error("数据库写入异常,数据: {}", data, e);
return DatabaseWriteResult.error("写入异常: " + e.getMessage());
}
}

/**
* 限流写入多条记录
* @param dataList 数据列表
* @return 写入结果
*/
public BatchWriteResult writeBatchWithLimit(List<Map<String, Object>> dataList) {
if (!properties.isEnabled()) {
return executeBatchWrite(dataList);
}

BatchWriteResult result = new BatchWriteResult();
result.setTotalCount(dataList.size());
result.setStartTime(System.currentTimeMillis());

int successCount = 0;
int limitedCount = 0;
int errorCount = 0;

for (Map<String, Object> data : dataList) {
try {
// 尝试获取令牌
boolean acquired = acquireToken();

if (acquired) {
// 执行写入
DatabaseWriteResult writeResult = executeWrite(data);

if (writeResult.isSuccess()) {
successCount++;
monitorService.recordSuccess();
} else {
errorCount++;
monitorService.recordError();
}
} else {
limitedCount++;
monitorService.recordLimited();
}

} catch (Exception e) {
errorCount++;
monitorService.recordError();
logger.error("批量写入异常,数据: {}", data, e);
}
}

result.setSuccessCount(successCount);
result.setLimitedCount(limitedCount);
result.setErrorCount(errorCount);
result.setEndTime(System.currentTimeMillis());

logger.info("批量写入完成,成功: {}, 限流: {}, 错误: {}, 总耗时: {}ms",
successCount, limitedCount, errorCount, result.getDuration());

return result;
}

/**
* 获取令牌
* @return 是否获取成功
*/
private boolean acquireToken() {
if (properties.isTimeoutEnabled()) {
return rateLimiter.tryAcquire(properties.getTimeoutMs(), TimeUnit.MILLISECONDS);
} else {
return rateLimiter.tryAcquire();
}
}

/**
* 执行写入操作
* @param data 数据
* @return 写入结果
*/
private DatabaseWriteResult executeWrite(Map<String, Object> data) {
// 模拟数据库写入操作
try {
Thread.sleep(50); // 模拟写入耗时

DatabaseWriteResult result = new DatabaseWriteResult();
result.setSuccess(true);
result.setRowsAffected(1);
result.setExecutionTime(System.currentTimeMillis());
result.setData(data);

return result;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return DatabaseWriteResult.error("写入被中断");
} catch (Exception e) {
return DatabaseWriteResult.error("写入失败: " + e.getMessage());
}
}

/**
* 执行批量写入操作
* @param dataList 数据列表
* @return 批量写入结果
*/
private BatchWriteResult executeBatchWrite(List<Map<String, Object>> dataList) {
BatchWriteResult result = new BatchWriteResult();
result.setTotalCount(dataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 模拟批量写入操作
Thread.sleep(dataList.size() * 10); // 模拟批量写入耗时

result.setSuccessCount(dataList.size());
result.setLimitedCount(0);
result.setErrorCount(0);
result.setEndTime(System.currentTimeMillis());

return result;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.setErrorCount(dataList.size());
result.setEndTime(System.currentTimeMillis());
return result;
} catch (Exception e) {
result.setErrorCount(dataList.size());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 获取限流器状态
* @return 限流器状态
*/
public RateLimiterStatus getRateLimiterStatus() {
RateLimiterStatus status = new RateLimiterStatus();
status.setQpsLimit(properties.getQpsLimit());
status.setAvailablePermits(rateLimiter.availablePermits());
status.setCurrentTime(System.currentTimeMillis());

return status;
}

/**
* 动态调整QPS限制
* @param newQpsLimit 新的QPS限制
*/
public void adjustQpsLimit(double newQpsLimit) {
rateLimiter.setRate(newQpsLimit);
properties.setQpsLimit(newQpsLimit);

logger.info("数据库写入QPS限制调整: {} -> {}", properties.getQpsLimit(), newQpsLimit);
}
}

2.4 数据库写入结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 数据库写入结果类
* @author 运维实战
*/
@Data
public class DatabaseWriteResult {

private boolean success;
private int rowsAffected;
private String error;
private long executionTime;
private Map<String, Object> data;

public DatabaseWriteResult() {
this.success = false;
this.rowsAffected = 0;
this.executionTime = System.currentTimeMillis();
}

/**
* 创建成功结果
* @param rowsAffected 影响行数
* @return 成功结果
*/
public static DatabaseWriteResult success(int rowsAffected) {
DatabaseWriteResult result = new DatabaseWriteResult();
result.setSuccess(true);
result.setRowsAffected(rowsAffected);
return result;
}

/**
* 创建限流结果
* @param message 限流消息
* @return 限流结果
*/
public static DatabaseWriteResult limited(String message) {
DatabaseWriteResult result = new DatabaseWriteResult();
result.setSuccess(false);
result.setError(message);
return result;
}

/**
* 创建错误结果
* @param error 错误信息
* @return 错误结果
*/
public static DatabaseWriteResult error(String error) {
DatabaseWriteResult result = new DatabaseWriteResult();
result.setSuccess(false);
result.setError(error);
return result;
}

/**
* 是否被限流
* @return 是否被限流
*/
public boolean isLimited() {
return !success && error != null && error.contains("限流");
}
}

2.5 批量写入结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 批量写入结果类
* @author 运维实战
*/
@Data
public class BatchWriteResult {

private int totalCount;
private int successCount;
private int limitedCount;
private int errorCount;
private long startTime;
private long endTime;

public BatchWriteResult() {
this.successCount = 0;
this.limitedCount = 0;
this.errorCount = 0;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalCount == 0) return 0.0;
return (double) successCount / totalCount * 100;
}

/**
* 获取限流率
* @return 限流率
*/
public double getLimitedRate() {
if (totalCount == 0) return 0.0;
return (double) limitedCount / totalCount * 100;
}

/**
* 获取错误率
* @return 错误率
*/
public double getErrorRate() {
if (totalCount == 0) return 0.0;
return (double) errorCount / totalCount * 100;
}

/**
* 是否全部成功
* @return 是否全部成功
*/
public boolean isAllSuccess() {
return limitedCount == 0 && errorCount == 0;
}
}

3. 高级限流功能

3.1 滑动窗口限流服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 滑动窗口限流服务
* @author 运维实战
*/
@Service
public class SlidingWindowLimitService {

private final Map<String, SlidingWindow> windows = new ConcurrentHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLimitService.class);

/**
* 滑动窗口限流写入
* @param key 限流键
* @param qpsLimit QPS限制
* @param data 数据
* @return 写入结果
*/
public DatabaseWriteResult writeWithSlidingWindow(String key, int qpsLimit, Map<String, Object> data) {
try {
// 获取或创建滑动窗口
SlidingWindow window = windows.computeIfAbsent(key, k -> new SlidingWindow(qpsLimit, 1000));

// 检查是否允许写入
if (window.tryAcquire()) {
// 执行写入
DatabaseWriteResult result = executeWrite(data);

logger.debug("滑动窗口限流写入成功,key: {}, data: {}", key, data);
return result;
} else {
logger.warn("滑动窗口限流写入被限流,key: {}, qpsLimit: {}", key, qpsLimit);
return DatabaseWriteResult.limited("滑动窗口限流:请求频率过高");
}

} catch (Exception e) {
logger.error("滑动窗口限流写入异常,key: {}, data: {}", key, data, e);
return DatabaseWriteResult.error("写入异常: " + e.getMessage());
}
}

/**
* 获取滑动窗口状态
* @param key 限流键
* @return 滑动窗口状态
*/
public SlidingWindowStatus getSlidingWindowStatus(String key) {
SlidingWindow window = windows.get(key);
if (window == null) {
return null;
}

SlidingWindowStatus status = new SlidingWindowStatus();
status.setKey(key);
status.setQpsLimit(window.getQpsLimit());
status.setCurrentQps(window.getCurrentQps());
status.setWindowSize(window.getWindowSize());
status.setCurrentTime(System.currentTimeMillis());

return status;
}

/**
* 执行写入操作
* @param data 数据
* @return 写入结果
*/
private DatabaseWriteResult executeWrite(Map<String, Object> data) {
// 模拟数据库写入操作
try {
Thread.sleep(50); // 模拟写入耗时

DatabaseWriteResult result = new DatabaseWriteResult();
result.setSuccess(true);
result.setRowsAffected(1);
result.setExecutionTime(System.currentTimeMillis());
result.setData(data);

return result;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return DatabaseWriteResult.error("写入被中断");
} catch (Exception e) {
return DatabaseWriteResult.error("写入失败: " + e.getMessage());
}
}
}

3.2 滑动窗口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* 滑动窗口实现类
* @author 运维实战
*/
public class SlidingWindow {

private final int qpsLimit;
private final long windowSize;
private final Queue<Long> requests;
private final Object lock = new Object();

public SlidingWindow(int qpsLimit, long windowSize) {
this.qpsLimit = qpsLimit;
this.windowSize = windowSize;
this.requests = new LinkedList<>();
}

/**
* 尝试获取请求许可
* @return 是否获取成功
*/
public boolean tryAcquire() {
synchronized (lock) {
long currentTime = System.currentTimeMillis();

// 清理过期请求
cleanExpiredRequests(currentTime);

// 检查是否超过限制
if (requests.size() >= qpsLimit) {
return false;
}

// 添加当前请求
requests.offer(currentTime);
return true;
}
}

/**
* 清理过期请求
* @param currentTime 当前时间
*/
private void cleanExpiredRequests(long currentTime) {
long expireTime = currentTime - windowSize;

while (!requests.isEmpty() && requests.peek() < expireTime) {
requests.poll();
}
}

/**
* 获取当前QPS
* @return 当前QPS
*/
public int getCurrentQps() {
synchronized (lock) {
long currentTime = System.currentTimeMillis();
cleanExpiredRequests(currentTime);
return requests.size();
}
}

/**
* 获取QPS限制
* @return QPS限制
*/
public int getQpsLimit() {
return qpsLimit;
}

/**
* 获取窗口大小
* @return 窗口大小
*/
public long getWindowSize() {
return windowSize;
}
}

3.3 滑动窗口状态类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 滑动窗口状态类
* @author 运维实战
*/
@Data
public class SlidingWindowStatus {

private String key;
private int qpsLimit;
private int currentQps;
private long windowSize;
private long currentTime;

public SlidingWindowStatus() {
this.currentTime = System.currentTimeMillis();
}

/**
* 获取使用率
* @return 使用率
*/
public double getUsageRate() {
if (qpsLimit == 0) return 0.0;
return (double) currentQps / qpsLimit * 100;
}

/**
* 是否接近限制
* @return 是否接近限制
*/
public boolean isNearLimit() {
return getUsageRate() > 80;
}

/**
* 是否已限制
* @return 是否已限制
*/
public boolean isLimited() {
return currentQps >= qpsLimit;
}
}

3.4 数据库写入监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* 数据库写入监控服务
* @author 运维实战
*/
@Service
public class DatabaseWriteMonitorService {

private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong limitedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong totalCount = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 60000; // 1分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteMonitorService.class);

/**
* 记录成功
*/
public void recordSuccess() {
successCount.incrementAndGet();
totalCount.incrementAndGet();
}

/**
* 记录限流
*/
public void recordLimited() {
limitedCount.incrementAndGet();
totalCount.incrementAndGet();
}

/**
* 记录错误
*/
public void recordError() {
errorCount.incrementAndGet();
totalCount.incrementAndGet();
}

/**
* 获取监控指标
* @return 监控指标
*/
public DatabaseWriteMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

DatabaseWriteMetrics metrics = new DatabaseWriteMetrics();
metrics.setSuccessCount(successCount.get());
metrics.setLimitedCount(limitedCount.get());
metrics.setErrorCount(errorCount.get());
metrics.setTotalCount(totalCount.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
successCount.set(0);
limitedCount.set(0);
errorCount.set(0);
totalCount.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("数据库写入监控指标重置");
}

/**
* 定期监控数据库写入状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorDatabaseWriteStatus() {
try {
DatabaseWriteMetrics metrics = getMetrics();

logger.info("数据库写入监控: 总请求数={}, 成功数={}, 限流数={}, 错误数={}, 成功率={}%, 限流率={}%",
metrics.getTotalCount(), metrics.getSuccessCount(), metrics.getLimitedCount(),
metrics.getErrorCount(), String.format("%.2f", metrics.getSuccessRate()),
String.format("%.2f", metrics.getLimitedRate()));

// 检查异常情况
if (metrics.getLimitedRate() > 20) {
logger.warn("数据库写入限流率过高: {}%", String.format("%.2f", metrics.getLimitedRate()));
}

if (metrics.getErrorRate() > 10) {
logger.warn("数据库写入错误率过高: {}%", String.format("%.2f", metrics.getErrorRate()));
}

} catch (Exception e) {
logger.error("数据库写入状态监控失败", e);
}
}
}

3.5 数据库写入指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 数据库写入指标类
* @author 运维实战
*/
@Data
public class DatabaseWriteMetrics {

private long successCount;
private long limitedCount;
private long errorCount;
private long totalCount;
private long timestamp;

public DatabaseWriteMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalCount == 0) return 0.0;
return (double) successCount / totalCount * 100;
}

/**
* 获取限流率
* @return 限流率
*/
public double getLimitedRate() {
if (totalCount == 0) return 0.0;
return (double) limitedCount / totalCount * 100;
}

/**
* 获取错误率
* @return 错误率
*/
public double getErrorRate() {
if (totalCount == 0) return 0.0;
return (double) errorCount / totalCount * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getLimitedRate() < 20 && getErrorRate() < 10;
}
}

3.6 限流器状态类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 限流器状态类
* @author 运维实战
*/
@Data
public class RateLimiterStatus {

private double qpsLimit;
private double availablePermits;
private long currentTime;

public RateLimiterStatus() {
this.currentTime = System.currentTimeMillis();
}

/**
* 获取使用率
* @return 使用率
*/
public double getUsageRate() {
if (qpsLimit == 0) return 0.0;
return (qpsLimit - availablePermits) / qpsLimit * 100;
}

/**
* 是否接近限制
* @return 是否接近限制
*/
public boolean isNearLimit() {
return availablePermits < qpsLimit * 0.1;
}

/**
* 是否已限制
* @return 是否已限制
*/
public boolean isLimited() {
return availablePermits <= 0;
}
}

4. 数据库写入限流控制器

4.1 数据库写入限流REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/**
* 数据库写入限流REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/database/write/limit")
public class DatabaseWriteLimitController {

@Autowired
private DatabaseWriteLimitService databaseWriteLimitService;

@Autowired
private SlidingWindowLimitService slidingWindowLimitService;

@Autowired
private DatabaseWriteMonitorService monitorService;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitController.class);

/**
* 限流写入单条记录
* @param request 写入请求
* @return 写入结果
*/
@PostMapping("/single")
public ResponseEntity<DatabaseWriteResult> writeSingle(@RequestBody DatabaseWriteRequest request) {
try {
logger.info("接收到限流写入单条记录请求");

DatabaseWriteResult result = databaseWriteLimitService.writeWithLimit(request.getData());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("限流写入单条记录失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 限流写入多条记录
* @param request 批量写入请求
* @return 写入结果
*/
@PostMapping("/batch")
public ResponseEntity<BatchWriteResult> writeBatch(@RequestBody BatchWriteRequest request) {
try {
logger.info("接收到限流写入多条记录请求,数量: {}", request.getDataList().size());

BatchWriteResult result = databaseWriteLimitService.writeBatchWithLimit(request.getDataList());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("限流写入多条记录失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 滑动窗口限流写入
* @param request 滑动窗口写入请求
* @return 写入结果
*/
@PostMapping("/sliding-window")
public ResponseEntity<DatabaseWriteResult> writeWithSlidingWindow(@RequestBody SlidingWindowWriteRequest request) {
try {
logger.info("接收到滑动窗口限流写入请求,key: {}, qpsLimit: {}",
request.getKey(), request.getQpsLimit());

DatabaseWriteResult result = slidingWindowLimitService.writeWithSlidingWindow(
request.getKey(), request.getQpsLimit(), request.getData());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("滑动窗口限流写入失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取限流器状态
* @return 限流器状态
*/
@GetMapping("/status")
public ResponseEntity<RateLimiterStatus> getRateLimiterStatus() {
try {
RateLimiterStatus status = databaseWriteLimitService.getRateLimiterStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
logger.error("获取限流器状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取滑动窗口状态
* @param key 限流键
* @return 滑动窗口状态
*/
@GetMapping("/sliding-window/status/{key}")
public ResponseEntity<SlidingWindowStatus> getSlidingWindowStatus(@PathVariable String key) {
try {
SlidingWindowStatus status = slidingWindowLimitService.getSlidingWindowStatus(key);
if (status != null) {
return ResponseEntity.ok(status);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
logger.error("获取滑动窗口状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<DatabaseWriteMetrics> getMetrics() {
try {
DatabaseWriteMetrics metrics = monitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 调整QPS限制
* @param request 调整请求
* @return 调整结果
*/
@PostMapping("/adjust")
public ResponseEntity<Map<String, String>> adjustQpsLimit(@RequestBody QpsAdjustRequest request) {
try {
logger.info("接收到QPS限制调整请求: {}", request.getQpsLimit());

databaseWriteLimitService.adjustQpsLimit(request.getQpsLimit());

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("message", "QPS限制调整成功");
response.put("newQpsLimit", String.valueOf(request.getQpsLimit()));

return ResponseEntity.ok(response);

} catch (Exception e) {
logger.error("调整QPS限制失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

4.2 请求类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 数据库写入请求类
* @author 运维实战
*/
@Data
public class DatabaseWriteRequest {

private Map<String, Object> data;

public DatabaseWriteRequest() {}

public DatabaseWriteRequest(Map<String, Object> data) {
this.data = data;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 批量写入请求类
* @author 运维实战
*/
@Data
public class BatchWriteRequest {

private List<Map<String, Object>> dataList;

public BatchWriteRequest() {}

public BatchWriteRequest(List<Map<String, Object>> dataList) {
this.dataList = dataList;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 滑动窗口写入请求类
* @author 运维实战
*/
@Data
public class SlidingWindowWriteRequest {

private String key;
private int qpsLimit;
private Map<String, Object> data;

public SlidingWindowWriteRequest() {}

public SlidingWindowWriteRequest(String key, int qpsLimit, Map<String, Object> data) {
this.key = key;
this.qpsLimit = qpsLimit;
this.data = data;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* QPS调整请求类
* @author 运维实战
*/
@Data
public class QpsAdjustRequest {

private double qpsLimit;

public QpsAdjustRequest() {}

public QpsAdjustRequest(double qpsLimit) {
this.qpsLimit = qpsLimit;
}
}

5. 数据库写入限流注解和AOP

5.1 数据库写入限流注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 数据库写入限流注解
* @author 运维实战
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DatabaseWriteLimit {

/**
* QPS限制
*/
double qpsLimit() default 100.0;

/**
* 限流键
*/
String key() default "";

/**
* 限流模式
*/
String mode() default "TOKEN_BUCKET"; // TOKEN_BUCKET, SLIDING_WINDOW

/**
* 超时时间(毫秒)
*/
long timeoutMs() default 1000;

/**
* 是否启用超时
*/
boolean timeoutEnabled() default true;

/**
* 限流失败时的消息
*/
String message() default "数据库写入请求被限流,请稍后重试";

/**
* 限流失败时的HTTP状态码
*/
int statusCode() default 429;
}

5.2 数据库写入限流AOP切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 数据库写入限流AOP切面
* @author 运维实战
*/
@Aspect
@Component
public class DatabaseWriteLimitAspect {

@Autowired
private DatabaseWriteLimitService databaseWriteLimitService;

@Autowired
private SlidingWindowLimitService slidingWindowLimitService;

@Autowired
private DatabaseWriteMonitorService monitorService;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitAspect.class);

/**
* 数据库写入限流切点
*/
@Pointcut("@annotation(databaseWriteLimit)")
public void databaseWriteLimitPointcut(DatabaseWriteLimit databaseWriteLimit) {}

/**
* 数据库写入限流环绕通知
* @param joinPoint 连接点
* @param databaseWriteLimit 限流注解
* @return 执行结果
* @throws Throwable 异常
*/
@Around("databaseWriteLimitPointcut(databaseWriteLimit)")
public Object around(ProceedingJoinPoint joinPoint, DatabaseWriteLimit databaseWriteLimit) throws Throwable {
String key = generateKey(joinPoint, databaseWriteLimit);

try {
// 根据限流模式选择限流方式
DatabaseWriteResult result;

if ("SLIDING_WINDOW".equals(databaseWriteLimit.mode())) {
// 滑动窗口限流
result = slidingWindowLimitService.writeWithSlidingWindow(
key, (int) databaseWriteLimit.qpsLimit(), null);
} else {
// 令牌桶限流
result = databaseWriteLimitService.writeWithLimit(null);
}

if (result.isSuccess()) {
logger.debug("数据库写入限流检查通过: key={}", key);
return joinPoint.proceed();
} else {
logger.warn("数据库写入限流检查失败: key={}, message={}", key, result.getError());
throw new DatabaseWriteLimitException(databaseWriteLimit.message(), databaseWriteLimit.statusCode());
}

} catch (DatabaseWriteLimitException e) {
throw e;
} catch (Exception e) {
logger.error("数据库写入限流处理异常: key={}", key, e);
throw new DatabaseWriteLimitException("数据库写入限流处理异常", 500);
}
}

/**
* 生成限流键
* @param joinPoint 连接点
* @param databaseWriteLimit 限流注解
* @return 限流键
*/
private String generateKey(ProceedingJoinPoint joinPoint, DatabaseWriteLimit databaseWriteLimit) {
if (!databaseWriteLimit.key().isEmpty()) {
return databaseWriteLimit.key();
}

// 生成默认键
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String className = signature.getDeclaringType().getSimpleName();
String methodName = signature.getName();

return className + "." + methodName;
}
}

5.3 数据库写入限流异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 数据库写入限流异常类
* @author 运维实战
*/
public class DatabaseWriteLimitException extends RuntimeException {

private final int statusCode;

public DatabaseWriteLimitException(String message) {
super(message);
this.statusCode = 429;
}

public DatabaseWriteLimitException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}

public DatabaseWriteLimitException(String message, Throwable cause) {
super(message, cause);
this.statusCode = 429;
}

public DatabaseWriteLimitException(String message, Throwable cause, int statusCode) {
super(message, cause);
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}

5.4 数据库写入限流异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 数据库写入限流异常处理器
* @author 运维实战
*/
@ControllerAdvice
public class DatabaseWriteLimitExceptionHandler {

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitExceptionHandler.class);

/**
* 处理数据库写入限流异常
* @param e 限流异常
* @return 错误响应
*/
@ExceptionHandler(DatabaseWriteLimitException.class)
public ResponseEntity<Map<String, Object>> handleDatabaseWriteLimitException(DatabaseWriteLimitException e) {
logger.warn("数据库写入限流异常: {}", e.getMessage());

Map<String, Object> response = new HashMap<>();
response.put("error", "DATABASE_WRITE_LIMIT_EXCEEDED");
response.put("message", e.getMessage());
response.put("timestamp", System.currentTimeMillis());

return ResponseEntity.status(e.getStatusCode()).body(response);
}
}

6. 实际应用示例

6.1 使用数据库写入限流注解的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 使用数据库写入限流注解的服务
* @author 运维实战
*/
@Service
public class DatabaseWriteLimitExampleService {

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitExampleService.class);

/**
* 基础限流写入示例
* @param data 数据
* @return 处理结果
*/
@DatabaseWriteLimit(qpsLimit = 50.0, key = "example.basic", message = "基础限流:数据库写入请求被限流")
public String basicWriteLimit(String data) {
logger.info("执行基础限流写入示例: {}", data);

// 模拟数据库写入
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "基础限流写入完成: " + data;
}

/**
* 滑动窗口限流写入示例
* @param data 数据
* @return 处理结果
*/
@DatabaseWriteLimit(qpsLimit = 30.0, key = "example.sliding", mode = "SLIDING_WINDOW",
message = "滑动窗口限流:数据库写入请求被限流")
public String slidingWindowWriteLimit(String data) {
logger.info("执行滑动窗口限流写入示例: {}", data);

// 模拟数据库写入
try {
Thread.sleep(120);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "滑动窗口限流写入完成: " + data;
}

/**
* 高QPS限流写入示例
* @param data 数据
* @return 处理结果
*/
@DatabaseWriteLimit(qpsLimit = 200.0, key = "example.highQps", timeoutMs = 500,
message = "高QPS限流:数据库写入请求被限流")
public String highQpsWriteLimit(String data) {
logger.info("执行高QPS限流写入示例: {}", data);

// 模拟数据库写入
try {
Thread.sleep(80);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "高QPS限流写入完成: " + data;
}

/**
* 低QPS限流写入示例
* @param data 数据
* @return 处理结果
*/
@DatabaseWriteLimit(qpsLimit = 10.0, key = "example.lowQps", timeoutMs = 2000,
message = "低QPS限流:数据库写入请求被限流")
public String lowQpsWriteLimit(String data) {
logger.info("执行低QPS限流写入示例: {}", data);

// 模拟数据库写入
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "低QPS限流写入完成: " + data;
}
}

6.2 数据库写入限流测试控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
* 数据库写入限流测试控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/database/write/limit/test")
public class DatabaseWriteLimitTestController {

@Autowired
private DatabaseWriteLimitExampleService exampleService;

@Autowired
private DatabaseWriteLimitService databaseWriteLimitService;

@Autowired
private DatabaseWriteMonitorService monitorService;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitTestController.class);

/**
* 基础限流写入测试
* @param data 测试数据
* @return 测试结果
*/
@GetMapping("/basic")
public ResponseEntity<Map<String, String>> testBasicWriteLimit(@RequestParam String data) {
try {
String result = exampleService.basicWriteLimit(data);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (DatabaseWriteLimitException e) {
logger.warn("基础限流写入测试被限流: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("基础限流写入测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 滑动窗口限流写入测试
* @param data 测试数据
* @return 测试结果
*/
@GetMapping("/sliding-window")
public ResponseEntity<Map<String, String>> testSlidingWindowWriteLimit(@RequestParam String data) {
try {
String result = exampleService.slidingWindowWriteLimit(data);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (DatabaseWriteLimitException e) {
logger.warn("滑动窗口限流写入测试被限流: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("滑动窗口限流写入测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 高QPS限流写入测试
* @param data 测试数据
* @return 测试结果
*/
@GetMapping("/high-qps")
public ResponseEntity<Map<String, String>> testHighQpsWriteLimit(@RequestParam String data) {
try {
String result = exampleService.highQpsWriteLimit(data);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (DatabaseWriteLimitException e) {
logger.warn("高QPS限流写入测试被限流: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("高QPS限流写入测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 低QPS限流写入测试
* @param data 测试数据
* @return 测试结果
*/
@GetMapping("/low-qps")
public ResponseEntity<Map<String, String>> testLowQpsWriteLimit(@RequestParam String data) {
try {
String result = exampleService.lowQpsWriteLimit(data);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (DatabaseWriteLimitException e) {
logger.warn("低QPS限流写入测试被限流: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("低QPS限流写入测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取限流器状态
* @return 限流器状态
*/
@GetMapping("/status")
public ResponseEntity<RateLimiterStatus> getStatus() {
try {
RateLimiterStatus status = databaseWriteLimitService.getRateLimiterStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
logger.error("获取限流器状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<DatabaseWriteMetrics> getMetrics() {
try {
DatabaseWriteMetrics metrics = monitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

7. 总结

7.1 数据库写入限流最佳实践

  1. 合理设置QPS限制: 根据数据库性能和业务需求设置QPS限制
  2. 选择合适的限流算法: 根据场景选择令牌桶、滑动窗口等算法
  3. 监控限流状态: 实时监控限流器状态和性能指标
  4. 动态调整参数: 根据负载情况动态调整限流参数
  5. 异常处理: 实现完善的异常处理和用户友好提示

7.2 性能优化建议

  • 预热机制: 使用预热机制平滑启动限流器
  • 滑动窗口: 使用滑动窗口实现精确限流
  • 监控告警: 建立完善的监控和告警机制
  • 异步处理: 使用异步处理提升系统响应性能
  • 缓存优化: 合理使用缓存减少限流检查开销

7.3 运维管理要点

  • 实时监控: 监控限流器状态和性能指标
  • 动态调整: 根据负载情况动态调整限流参数
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化限流参数

通过本文的限制每秒允许写入数据库的请求数量Java实战指南,您可以掌握数据库写入限流的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的数据库写入限流系统!