前言

随着业务规模增长,单数据库节点已成为系统瓶颈。读写分离多节点架构通过将读操作分散到多个从节点,写操作集中在主节点,显著提升系统并发能力与可用性。本文从SpringBoot配置到企业级治理,系统梳理可落地的多节点架构方案。

一、架构设计原则

  • 读写分离:写操作路由到主库,读操作路由到从库,降低主库压力。
  • 负载均衡:多个从库间轮询/权重分配,提升读性能。
  • 故障转移:主库故障时自动切换,从库故障时剔除并恢复。
  • 数据一致性:主从同步延迟处理,最终一致性保障。
  • 监控告警:实时监控各节点状态,异常及时告警。

二、核心组件选型

2.1 数据源管理

  • HikariCP:高性能连接池,支持多数据源配置。
  • Druid:阿里开源连接池,提供监控与SQL防火墙。
  • ShardingSphere:分库分表中间件,支持读写分离。

2.2 负载均衡策略

  • 轮询(Round Robin):均匀分配请求。
  • 权重轮询(Weighted Round Robin):按性能分配权重。
  • 最少连接(Least Connections):选择连接数最少的节点。
  • 一致性哈希(Consistent Hash):保证相同请求路由到同一节点。

三、基础配置实现

3.1 多数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
@EnableTransactionManagement
public class DataSourceConfig {

@Bean
@Primary
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
return DruidDataSourceBuilder.create().build();
}

@Bean
@ConfigurationProperties("spring.datasource.slave1")
public DataSource slave1DataSource() {
return DruidDataSourceBuilder.create().build();
}

@Bean
@ConfigurationProperties("spring.datasource.slave2")
public DataSource slave2DataSource() {
return DruidDataSourceBuilder.create().build();
}

@Bean
public DataSource routingDataSource() {
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave1", slave1DataSource());
dataSourceMap.put("slave2", slave2DataSource());

DynamicDataSource routingDataSource = new DynamicDataSource();
routingDataSource.setDefaultTargetDataSource(masterDataSource());
routingDataSource.setTargetDataSources(dataSourceMap);
return routingDataSource;
}
}

3.2 动态数据源切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class DynamicDataSource extends AbstractRoutingDataSource {

@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}

public class DataSourceContextHolder {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();

public static void setDataSourceType(String dataSourceType) {
CONTEXT_HOLDER.set(dataSourceType);
}

public static String getDataSourceType() {
return CONTEXT_HOLDER.get();
}

public static void clearDataSourceType() {
CONTEXT_HOLDER.remove();
}
}

四、读写分离实现

4.1 AOP切面实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Aspect
@Component
@Order(1)
public class DataSourceAspect {

@Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
public void transactionalPointcut() {}

@Pointcut("execution(* com.example.service.*.save*(..)) || " +
"execution(* com.example.service.*.insert*(..)) || " +
"execution(* com.example.service.*.update*(..)) || " +
"execution(* com.example.service.*.delete*(..))")
public void writePointcut() {}

@Pointcut("execution(* com.example.service.*.find*(..)) || " +
"execution(* com.example.service.*.get*(..)) || " +
"execution(* com.example.service.*.list*(..)) || " +
"execution(* com.example.service.*.query*(..))")
public void readPointcut() {}

@Before("writePointcut()")
public void setWriteDataSource() {
DataSourceContextHolder.setDataSourceType("master");
}

@Before("readPointcut()")
public void setReadDataSource() {
String slave = LoadBalancer.selectSlave();
DataSourceContextHolder.setDataSourceType(slave);
}

@After("transactionalPointcut()")
public void clearDataSource() {
DataSourceContextHolder.clearDataSourceType();
}
}

4.2 负载均衡器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class LoadBalancer {

private static final List<String> SLAVE_NODES = Arrays.asList("slave1", "slave2");
private static final AtomicInteger COUNTER = new AtomicInteger(0);

public static String selectSlave() {
int index = COUNTER.getAndIncrement() % SLAVE_NODES.size();
return SLAVE_NODES.get(index);
}

// 权重轮询
public static String selectSlaveByWeight() {
Map<String, Integer> weights = Map.of("slave1", 3, "slave2", 1);
int totalWeight = weights.values().stream().mapToInt(Integer::intValue).sum();
int random = ThreadLocalRandom.current().nextInt(totalWeight);

int currentWeight = 0;
for (Map.Entry<String, Integer> entry : weights.entrySet()) {
currentWeight += entry.getValue();
if (random < currentWeight) {
return entry.getKey();
}
}
return "slave1";
}
}

五、主从切换与故障转移

5.1 健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
@Slf4j
public class DataSourceHealthChecker {

@Autowired
private Map<String, DataSource> dataSourceMap;

private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();

@Scheduled(fixedDelay = 5000)
public void checkHealth() {
dataSourceMap.forEach((name, dataSource) -> {
try {
try (Connection conn = dataSource.getConnection()) {
boolean isValid = conn.isValid(3);
healthStatus.put(name, isValid);
if (!isValid) {
log.warn("数据源 {} 健康检查失败", name);
}
}
} catch (SQLException e) {
healthStatus.put(name, false);
log.error("数据源 {} 健康检查异常", name, e);
}
});
}

public boolean isHealthy(String dataSourceName) {
return healthStatus.getOrDefault(dataSourceName, false);
}

public List<String> getHealthySlaves() {
return healthStatus.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("slave") && entry.getValue())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
}

5.2 故障转移策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class FailoverStrategy {

@Autowired
private DataSourceHealthChecker healthChecker;

public String selectDataSource(String operation) {
if ("write".equals(operation)) {
return selectMaster();
} else {
return selectSlave();
}
}

private String selectMaster() {
if (healthChecker.isHealthy("master")) {
return "master";
}
// 主库故障,选择健康的从库作为临时主库
List<String> healthySlaves = healthChecker.getHealthySlaves();
if (!healthySlaves.isEmpty()) {
log.warn("主库故障,切换到从库: {}", healthySlaves.get(0));
return healthySlaves.get(0);
}
throw new RuntimeException("所有数据源都不可用");
}

private String selectSlave() {
List<String> healthySlaves = healthChecker.getHealthySlaves();
if (healthySlaves.isEmpty()) {
// 从库都故障,降级到主库
log.warn("所有从库故障,降级到主库");
return "master";
}
return LoadBalancer.selectSlaveByWeight();
}
}

六、ShardingSphere集成

6.1 配置读写分离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
spring:
shardingsphere:
datasource:
names: master,slave1,slave2
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master-host:3306/db?useUnicode=true&characterEncoding=utf8
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1-host:3306/db?useUnicode=true&characterEncoding=utf8
username: root
password: password
slave2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave2-host:3306/db?useUnicode=true&characterEncoding=utf8
username: root
password: password

rules:
readwrite-splitting:
data-sources:
readwrite_ds:
static-strategy:
write-data-source-name: master
read-data-source-names: slave1,slave2
load-balancer-name: round_robin

load-balancers:
round_robin:
type: ROUND_ROBIN

6.2 强制路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class ShardingSphereRouter {

@Autowired
private ShardingSphereDataSource dataSource;

public void forceMaster() {
HintManager.getInstance().setMasterRouteOnly();
}

public void forceSlave() {
HintManager.getInstance().setMasterRouteOnly(false);
}

public void clearHint() {
HintManager.clear();
}
}

七、连接池优化

7.1 HikariCP配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
datasource:
master:
hikari:
maximum-pool-size: 20
minimum-idle: 5
idle-timeout: 300000
max-lifetime: 1200000
connection-timeout: 30000
validation-timeout: 5000
leak-detection-threshold: 60000
slave1:
hikari:
maximum-pool-size: 15
minimum-idle: 3
idle-timeout: 300000
max-lifetime: 1200000
connection-timeout: 30000
slave2:
hikari:
maximum-pool-size: 15
minimum-idle: 3
idle-timeout: 300000
max-lifetime: 1200000
connection-timeout: 30000

7.2 连接池监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
@Slf4j
public class ConnectionPoolMonitor {

@Autowired
private Map<String, DataSource> dataSourceMap;

@Scheduled(fixedDelay = 30000)
public void monitorConnectionPools() {
dataSourceMap.forEach((name, dataSource) -> {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolBean = hikariDataSource.getHikariPoolMXBean();

log.info("数据源 {} - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待线程: {}",
name,
poolBean.getActiveConnections(),
poolBean.getIdleConnections(),
poolBean.getTotalConnections(),
poolBean.getThreadsAwaitingConnection());
}
});
}
}

八、事务管理

8.1 分布式事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
@Transactional
public class UserService {

@Autowired
private UserMapper userMapper;

@Transactional(rollbackFor = Exception.class)
public void createUser(User user) {
// 写操作,自动路由到主库
userMapper.insert(user);

// 如果需要立即读取,强制从主库读取
try {
HintManager.getInstance().setMasterRouteOnly();
User savedUser = userMapper.selectById(user.getId());
log.info("用户创建成功: {}", savedUser);
} finally {
HintManager.clear();
}
}

@Transactional(readOnly = true)
public List<User> findUsers(String keyword) {
// 读操作,自动路由到从库
return userMapper.findByKeyword(keyword);
}
}

8.2 读写分离事务边界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private UserMapper userMapper;

public void processOrder(Order order) {
// 1. 读操作 - 从库
User user = userMapper.selectById(order.getUserId());

// 2. 写操作 - 主库
orderMapper.insert(order);

// 3. 更新操作 - 主库
userMapper.updateBalance(user.getId(), order.getAmount());
}
}

九、监控与告警

9.1 数据源监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class DataSourceMetrics {

private final MeterRegistry meterRegistry;
private final Map<String, DataSource> dataSourceMap;

public DataSourceMetrics(MeterRegistry meterRegistry,
Map<String, DataSource> dataSourceMap) {
this.meterRegistry = meterRegistry;
this.dataSourceMap = dataSourceMap;
}

@EventListener
@Async
public void handleDataSourceEvent(DataSourceEvent event) {
Counter.builder("datasource.operation.count")
.tag("datasource", event.getDataSourceName())
.tag("operation", event.getOperation())
.register(meterRegistry)
.increment();
}

@Scheduled(fixedDelay = 10000)
public void recordConnectionMetrics() {
dataSourceMap.forEach((name, dataSource) -> {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolBean = hikariDataSource.getHikariPoolMXBean();

Gauge.builder("datasource.connections.active")
.tag("datasource", name)
.register(meterRegistry, poolBean, HikariPoolMXBean::getActiveConnections);

Gauge.builder("datasource.connections.idle")
.tag("datasource", name)
.register(meterRegistry, poolBean, HikariPoolMXBean::getIdleConnections);
}
});
}
}

9.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
export:
prometheus:
enabled: true

# Prometheus告警规则
groups:
- name: datasource-alerts
rules:
- alert: DataSourceDown
expr: datasource_health_status == 0
for: 30s
labels:
severity: critical
annotations:
summary: "数据源 {{ $labels.datasource }} 不可用"

- alert: HighConnectionUsage
expr: datasource_connections_active / datasource_connections_total > 0.8
for: 1m
labels:
severity: warning
annotations:
summary: "数据源 {{ $labels.datasource }} 连接使用率过高"

十、性能优化策略

10.1 读写分离优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class ReadWriteOptimizer {

// 读操作优化:批量查询
@Transactional(readOnly = true)
public List<User> batchFindUsers(List<Long> userIds) {
return userMapper.selectBatchIds(userIds);
}

// 写操作优化:批量插入
@Transactional
public void batchInsertUsers(List<User> users) {
userMapper.insertBatch(users);
}

// 读写分离优化:延迟读取
public void createUserWithDelayRead(User user) {
userMapper.insert(user);

// 异步延迟读取,避免主从延迟
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100); // 等待主从同步
User savedUser = userMapper.selectById(user.getId());
log.info("延迟读取用户: {}", savedUser);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}

10.2 缓存集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Service
public class CachedUserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Cacheable(value = "users", key = "#id")
@Transactional(readOnly = true)
public User findById(Long id) {
return userMapper.selectById(id);
}

@CacheEvict(value = "users", key = "#user.id")
@Transactional
public void updateUser(User user) {
userMapper.updateById(user);
}

@CacheEvict(value = "users", allEntries = true)
@Transactional
public void clearUserCache() {
// 缓存清理
}
}

十一、最佳实践总结

11.1 配置最佳实践

  1. 连接池配置:根据业务量调整最大连接数,避免连接泄漏。
  2. 超时设置:合理设置连接超时和查询超时,避免长时间阻塞。
  3. 健康检查:定期检查数据源健康状态,及时发现问题。
  4. 监控告警:建立完善的监控体系,异常及时告警。

11.2 开发最佳实践

  1. 事务边界:明确读写操作的事务边界,避免跨库事务。
  2. 强制路由:必要时使用强制路由,确保数据一致性。
  3. 异常处理:妥善处理数据源切换异常,提供降级方案。
  4. 性能测试:定期进行压力测试,验证架构性能。

11.3 运维最佳实践

  1. 备份策略:建立完善的数据备份和恢复策略。
  2. 扩容方案:制定数据源扩容和缩容方案。
  3. 故障演练:定期进行故障演练,验证高可用方案。
  4. 文档维护:维护详细的架构文档和操作手册。

十二、常见问题与解决方案

12.1 主从延迟问题

问题:主从同步延迟导致读取到旧数据。

解决方案

  • 使用强制路由到主库读取关键数据。
  • 实现延迟读取机制。
  • 监控主从延迟,设置告警阈值。

12.2 连接池耗尽

问题:高并发下连接池连接耗尽。

解决方案

  • 调整连接池大小配置。
  • 优化SQL查询性能。
  • 实现连接池监控和告警。

12.3 数据源切换失败

问题:数据源切换时出现异常。

解决方案

  • 实现降级策略,切换到可用数据源。
  • 增加重试机制。
  • 完善异常处理和日志记录。

十三、总结

SpringBoot读写分离多节点架构通过合理的数据源配置、负载均衡策略、故障转移机制和监控告警,能够显著提升系统的并发能力和可用性。在实际应用中,需要根据业务特点选择合适的中间件和配置参数,并建立完善的监控和运维体系,确保架构的稳定运行。

通过本文的实践指导,读者可以快速搭建企业级的读写分离多节点架构,为业务发展提供强有力的技术支撑。