获课地址:666it.top/13971/
# DBA 专家进阶必学:百亿 PV 系统数据倾斜与灾备方案实战
## 数据倾斜问题深度解析与解决方案
在百亿 PV 的大型系统中,数据倾斜是 DBA 面临的最棘手问题之一。数据倾斜不仅会导致查询性能急剧下降,还可能引发单点故障,影响整个系统的稳定性。
### 数据倾斜的常见场景分析
```sql
-- 创建测试表结构
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id INT,
behavior_type TINYINT,
timestamp BIGINT,
province VARCHAR(50),
city VARCHAR(50),
device_type VARCHAR(20),
os_type VARCHAR(20),
network_type VARCHAR(20),
app_version VARCHAR(10),
`date` DATE COMMENT '分区字段'
) PARTITION BY RANGE (TO_DAYS(`date`)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01'))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 创建索引
ALTER TABLE user_behavior ADD INDEX idx_user_date (user_id, `date`);
ALTER TABLE user_behavior ADD INDEX idx_category_date (category_id, `date`);
ALTER TABLE user_behavior ADD INDEX idx_timestamp (timestamp);
ALTER TABLE user_behavior ADD INDEX idx_province_date (province, `date`);
-- 数据倾斜分析SQL
-- 1. 分析用户行为数据分布
SELECT
behavior_type,
COUNT(*) as total_count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM user_behavior), 2) as percentage
FROM user_behavior
WHERE `date` = '2024-01-15'
GROUP BY behavior_type
ORDER BY total_count DESC;
-- 2. 分析地域分布数据倾斜
SELECT
province,
COUNT(*) as pv_count,
COUNT(DISTINCT user_id) as uv_count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM user_behavior WHERE `date` = '2024-01-15'), 2) as pv_percentage
FROM user_behavior
WHERE `date` = '2024-01-15'
GROUP BY province
ORDER BY pv_count DESC
LIMIT 20;
-- 3. 分析热门商品的数据倾斜
SELECT
item_id,
COUNT(*) as behavior_count,
COUNT(DISTINCT user_id) as unique_users
FROM user_behavior
WHERE `date` = '2024-01-15' AND behavior_type = 1 -- 1表示购买行为
GROUP BY item_id
HAVING behavior_count > 1000
ORDER BY behavior_count DESC
LIMIT 50;
```
### 数据倾斜的实时检测与监控系统
```java
// 数据倾斜监控服务
@Component
@Slf4j
public class DataSkewMonitorService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private AlertService alertService;
// 监控配置
@Value("${data.skew.threshold.pv:0.8}")
private double pvSkewThreshold;
@Value("${data.skew.threshold.storage:0.75}")
private double storageSkewThreshold;
@Value("${data.skew.threshold.query:5.0}")
private double querySkewThreshold;
/**
* 执行数据倾斜检测
*/
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void monitorDataSkew() {
log.info("开始数据倾斜监控检查...");
try {
// 1. 检测分区数据倾斜
detectPartitionSkew();
// 2. 检测热点用户
detectHotspotUsers();
// 3. 检测地域数据倾斜
detectRegionSkew();
// 4. 检测查询性能倾斜
detectQuerySkew();
} catch (Exception e) {
log.error("数据倾斜监控执行失败", e);
}
}
/**
* 检测分区数据倾斜
*/
private void detectPartitionSkew() {
String sql = """
SELECT
table_schema,
table_name,
partition_name,
table_rows,
data_length,
index_length,
ROUND(table_rows * 100.0 / SUM(table_rows) OVER (PARTITION BY table_name), 2) as row_percentage,
ROUND(data_length * 100.0 / SUM(data_length) OVER (PARTITION BY table_name), 2) as data_percentage
FROM information_schema.partitions
WHERE table_schema = DATABASE()
AND partition_name IS NOT NULL
AND table_rows > 1000000
ORDER BY table_name, table_rows DESC
""";
jdbcTemplate.query(sql, (rs, rowNum) -> {
String tableName = rs.getString("table_name");
String partitionName = rs.getString("partition_name");
long tableRows = rs.getLong("table_rows");
double rowPercentage = rs.getDouble("row_percentage");
double dataPercentage = rs.getDouble("data_percentage");
// 检查行数倾斜
if (rowPercentage > pvSkewThreshold * 100) {
String alertMsg = String.format(
"表 %s 分区 %s 数据行倾斜: %.2f%%, 行数: %d",
tableName, partitionName, rowPercentage, tableRows
);
alertService.sendAlert("DATA_SKEW_PARTITION", alertMsg);
}
// 检查存储倾斜
if (dataPercentage > storageSkewThreshold * 100) {
String alertMsg = String.format(
"表 %s 分区 %s 存储空间倾斜: %.2f%%",
tableName, partitionName, dataPercentage
);
alertService.sendAlert("DATA_SKEW_STORAGE", alertMsg);
}
return null;
});
}
/**
* 检测热点用户
*/
private void detectHotspotUsers() {
String sql = """
SELECT
user_id,
COUNT(*) as request_count,
COUNT(DISTINCT DATE(timestamp)) as active_days
FROM user_behavior
WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY user_id
HAVING request_count > 10000
ORDER BY request_count DESC
LIMIT 100
""";
List<HotspotUser> hotspotUsers = jdbcTemplate.query(sql, (rs, rowNum) -> {
HotspotUser user = new HotspotUser();
user.setUserId(rs.getLong("user_id"));
user.setRequestCount(rs.getLong("request_count"));
user.setActiveDays(rs.getInt("active_days"));
return user;
});
if (!hotspotUsers.isEmpty()) {
HotspotUser topUser = hotspotUsers.get(0);
if (topUser.getRequestCount() > 50000) {
String alertMsg = String.format(
"发现热点用户: %d, 7天内请求次数: %d",
topUser.getUserId(), topUser.getRequestCount()
);
alertService.sendAlert("HOTSPOT_USER", alertMsg);
}
}
}
/**
* 检测查询性能倾斜
*/
private void detectQuerySkew() {
String slowQuerySql = """
SELECT
DIGEST_TEXT as query_pattern,
COUNT_STAR as execution_count,
AVG_TIMER_WAIT / 1000000000 as avg_execution_time_sec,
MAX_TIMER_WAIT / 1000000000 as max_execution_time_sec,
SUM_ROWS_EXAMINED as total_rows_examined,
SUM_ROWS_SENT as total_rows_sent
FROM performance_schema.events_statements_summary_by_digest
WHERE COUNT_STAR > 10
AND AVG_TIMER_WAIT > 1000000000 -- 执行时间超过1秒
ORDER BY (MAX_TIMER_WAIT - AVG_TIMER_WAIT) DESC
LIMIT 20
""";
jdbcTemplate.query(slowQuerySql, (rs, rowNum) -> {
String queryPattern = rs.getString("query_pattern");
long executionCount = rs.getLong("execution_count");
double avgTime = rs.getDouble("avg_execution_time_sec");
double maxTime = rs.getDouble("max_execution_time_sec");
long rowsExamined = rs.getLong("total_rows_examined");
double skewRatio = maxTime / avgTime;
if (skewRatio > querySkewThreshold) {
String alertMsg = String.format(
"查询性能倾斜: %s, 执行次数: %d, 平均时间: %.2fs, 最大时间: %.2fs, 倾斜比: %.2f",
queryPattern, executionCount, avgTime, maxTime, skewRatio
);
alertService.sendAlert("QUERY_SKEW", alertMsg);
}
return null;
});
}
@Data
public static class HotspotUser {
private Long userId;
private Long requestCount;
private Integer activeDays;
}
}
// 数据倾斜自动处理服务
@Service
@Slf4j
public class DataSkewProcessorService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 处理用户维度数据倾斜 - 分桶策略
*/
public void processUserSkewByBucketing() {
log.info("开始处理用户维度数据倾斜...");
// 创建分桶表
String createBucketTableSql = """
CREATE TABLE IF NOT EXISTS user_behavior_bucketed (
bucket_id INT,
user_id BIGINT,
item_id BIGINT,
category_id INT,
behavior_type TINYINT,
timestamp BIGINT,
province VARCHAR(50),
city VARCHAR(50),
device_type VARCHAR(20),
os_type VARCHAR(20),
network_type VARCHAR(20),
app_version VARCHAR(10),
`date` DATE,
PRIMARY KEY (bucket_id, user_id, timestamp),
INDEX idx_bucket_date (bucket_id, `date`),
INDEX idx_user_bucket (user_id, bucket_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
PARTITION BY KEY (bucket_id)
PARTITIONS 32
""";
jdbcTemplate.execute(createBucketTableSql);
// 数据分桶迁移
String migrateDataSql = """
INSERT INTO user_behavior_bucketed
SELECT
ABS(CRC32(user_id)) % 32 as bucket_id,
user_id, item_id, category_id, behavior_type,
timestamp, province, city, device_type, os_type,
network_type, app_version, `date`
FROM user_behavior
WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
""";
long startTime = System.currentTimeMillis();
int affectedRows = jdbcTemplate.update(migrateDataSql);
long costTime = System.currentTimeMillis() - startTime;
log.info("数据分桶完成, 迁移记录数: {}, 耗时: {}ms", affectedRows, costTime);
}
/**
* 处理热点数据 - 缓存策略
*/
public void processHotspotDataWithCache() {
log.info("开始处理热点数据缓存...");
// 识别热点商品
String hotspotItemsSql = """
SELECT
item_id,
COUNT(*) as access_count,
COUNT(DISTINCT user_id) as unique_users
FROM user_behavior
WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY)
AND behavior_type IN (1, 2) -- 购买和浏览行为
GROUP BY item_id
HAVING access_count > 1000
ORDER BY access_count DESC
LIMIT 1000
""";
List<Long> hotspotItems = jdbcTemplate.queryForList(hotspotItemsSql, Long.class);
// 将热点商品信息加载到缓存
cacheHotspotItems(hotspotItems);
log.info("热点商品缓存处理完成, 数量: {}", hotspotItems.size());
}
/**
* 处理地域数据倾斜 - 读写分离策略
*/
public void processRegionSkewWithReadWriteSeparation() {
log.info("开始处理地域数据倾斜...");
// 分析地域访问模式
String regionAnalysisSql = """
SELECT
province,
COUNT(*) as total_requests,
COUNT(DISTINCT user_id) as unique_users,
AVG(CASE WHEN behavior_type = 1 THEN 1 ELSE 0 END) as purchase_rate
FROM user_behavior
WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY province
HAVING total_requests > 100000
ORDER BY total_requests DESC
""";
jdbcTemplate.query(regionAnalysisSql, (rs, rowNum) -> {
String province = rs.getString("province");
long totalRequests = rs.getLong("total_requests");
long uniqueUsers = rs.getLong("unique_users");
double purchaseRate = rs.getDouble("purchase_rate");
// 为高访问量地域配置独立的读写实例
if (totalRequests > 500000) {
configureRegionRouting(province, "read_write_separate");
}
// 为中等访问量地域配置只读副本
else if (totalRequests > 100000) {
configureRegionRouting(province, "read_replica");
}
return null;
});
}
private void cacheHotspotItems(List<Long> hotspotItems) {
// 实现热点商品缓存逻辑
// 这里可以集成Redis等缓存系统
}
private void configureRegionRouting(String province, String strategy) {
// 实现地域路由配置逻辑
log.info("配置地域 {} 使用策略: {}", province, strategy);
}
}
```
## 灾备方案设计与实现
### 多活数据中心架构设计
```java
// 多活数据同步服务
@Service
@Slf4j
public class MultiActiveDataSyncService {
@Autowired
private DataSource primaryDataSource;
@Autowired
private List<DataSource> standbyDataSources;
@Value("${datacenter.id:dc1}")
private String dataCenterId;
private final ExecutorService syncExecutor = Executors.newFixedThreadPool(10);
/**
* 多活数据同步
*/
@Async
public void syncToMultipleDataCenters(String tableName, List<Long> recordIds) {
if (CollectionUtils.isEmpty(recordIds)) {
return;
}
CompletableFuture<Void>[] futures = standbyDataSources.stream()
.map(datasource -> CompletableFuture.runAsync(() -> {
syncToDataCenter(datasource, tableName, recordIds);
}, syncExecutor))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures)
.exceptionally(ex -> {
log.error("多活数据同步失败", ex);
return null;
});
}
/**
* 同步到指定数据中心
*/
private void syncToDataCenter(DataSource dataSource, String tableName, List<Long> recordIds) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
// 批量同步数据
String querySql = String.format(
"SELECT * FROM %s WHERE id IN (%s)",
tableName,
recordIds.stream().map(String::valueOf).collect(Collectors.joining(","))
);
List<Map<String, Object>> records = jdbcTemplate.queryForList(querySql);
// 插入或更新到目标数据中心
for (Map<String, Object> record : records) {
upsertRecord(jdbcTemplate, tableName, record);
}
log.info("成功同步 {} 条记录到数据中心", records.size());
} catch (Exception e) {
log.error("同步到数据中心失败", e);
// 记录同步失败,后续重试
recordSyncFailure(dataSource, tableName, recordIds, e.getMessage());
}
}
/**
* 插入或更新记录
*/
private void upsertRecord(JdbcTemplate jdbcTemplate, String tableName, Map<String, Object> record) {
// 构建UPSERT SQL
String columns = String.join(", ", record.keySet());
String placeholders = record.keySet().stream()
.map(k -> "?")
.collect(Collectors.joining(", "));
String updates = record.keySet().stream()
.map(k -> k + " = VALUES(" + k + ")")
.collect(Collectors.joining(", "));
String upsertSql = String.format(
"INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
tableName, columns, placeholders, updates
);
jdbcTemplate.update(upsertSql, record.values().toArray());
}
}
// 数据库切换服务
@Service
@Slf4j
public class DatabaseFailoverService {
@Autowired
private DataSourceRouter dataSourceRouter;
@Autowired
private HealthCheckService healthCheckService;
@Value("${failover.timeout:30000}")
private long failoverTimeout;
/**
* 自动故障切换
*/
public boolean autoFailover(String failedDataSourceId) {
log.warn("开始自动故障切换, 故障数据源: {}", failedDataSourceId);
long startTime = System.currentTimeMillis();
try {
// 1. 检查备用数据源健康状态
List<String> availableDataSources = healthCheckService.getHealthyDataSources();
if (availableDataSources.isEmpty()) {
log.error("没有可用的备用数据源");
return false;
}
// 2. 选择最优备用数据源
String bestStandby = selectBestStandbyDataSource(availableDataSources);
if (bestStandby == null) {
log.error("无法选择最优备用数据源");
return false;
}
// 3. 执行切换
boolean switchSuccess = switchToStandby(bestStandby);
if (switchSuccess) {
log.info("故障切换成功, 切换到数据源: {}", bestStandby);
// 4. 通知监控系统
notifyMonitoringSystem(failedDataSourceId, bestStandby);
// 5. 启动数据同步恢复
startDataSyncRecovery(failedDataSourceId);
return true;
}
} catch (Exception e) {
log.error("自动故障切换失败", e);
} finally {
long costTime = System.currentTimeMillis() - startTime;
log.info("故障切换总耗时: {}ms", costTime);
}
return false;
}
/**
* 手动故障切换
*/
public boolean manualFailover(String targetDataSourceId) {
log.info("开始手动故障切换到: {}", targetDataSourceId);
try {
// 检查目标数据源健康状态
if (!healthCheckService.isDataSourceHealthy(targetDataSourceId)) {
log.error("目标数据源不健康: {}", targetDataSourceId);
return false;
}
// 执行切换
boolean switchSuccess = switchToStandby(targetDataSourceId);
if (switchSuccess) {
log.info("手动故障切换成功");
return true;
}
} catch (Exception e) {
log.error("手动故障切换失败", e);
}
return false;
}
/**
* 选择最优备用数据源
*/
private String selectBestStandbyDataSource(List<String> availableDataSources) {
// 根据数据延迟、负载、地理位置等因素选择最优备用
return availableDataSources.stream()
.min((ds1, ds2) -> {
// 综合评分计算
int score1 = calculateDataSourceScore(ds1);
int score2 = calculateDataSourceScore(ds2);
return Integer.compare(score1, score2);
})
.orElse(null);
}
/**
* 计算数据源综合评分
*/
private int calculateDataSourceScore(String dataSourceId) {
int score = 0;
try {
// 1. 数据延迟评分
long dataLag = healthCheckService.getDataLag(dataSourceId);
score += Math.min(100, (int)(dataLag / 1000)); // 每1秒延迟扣1分
// 2. 负载评分
double load = healthCheckService.getDataSourceLoad(dataSourceId);
score += (int)(load * 50); // 负载每1%扣0.5分
// 3. 网络延迟评分
long networkLatency = healthCheckService.getNetworkLatency(dataSourceId);
score += (int)(networkLatency / 10); // 每10ms网络延迟扣1分
} catch (Exception e) {
log.warn("计算数据源 {} 评分失败", dataSourceId, e);
score += 1000; // 计算失败时给高分,降低被选中的概率
}
return score;
}
private boolean switchToStandby(String standbyDataSourceId) {
// 实现具体的数据源切换逻辑
return dataSourceRouter.switchDataSource(standbyDataSourceId);
}
private void notifyMonitoringSystem(String failedDataSource, String newDataSource) {
// 通知监控系统故障切换事件
}
private void startDataSyncRecovery(String failedDataSourceId) {
// 启动数据同步恢复流程
}
}
// 数据备份与恢复服务
@Service
@Slf4j
public class BackupRecoveryService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${backup.base.dir:/data/backup}")
private String backupBaseDir;
@Value("${backup.retention.days:30}")
private int backupRetentionDays;
/**
* 执行全量备份
*/
public boolean performFullBackup() {
log.info("开始执行全量备份...");
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
String backupDir = backupBaseDir + "/full_backup_" + timestamp;
try {
// 1. 创建备份目录
Files.createDirectories(Paths.get(backupDir));
// 2. 获取所有需要备份的表
List<String> tables = getAllUserTables();
// 3. 并行备份每个表
List<CompletableFuture<Boolean>> futures = tables.stream()
.map(table -> backupTableAsync(table, backupDir))
.collect(Collectors.toList());
// 4. 等待所有备份完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(2, TimeUnit.HOURS); // 设置2小时超时
// 5. 备份元数据
backupMetadata(backupDir);
// 6. 清理过期备份
cleanupExpiredBackups();
log.info("全量备份完成: {}", backupDir);
return true;
} catch (Exception e) {
log.error("全量备份失败", e);
return false;
}
}
/**
* 执行增量备份
*/
public boolean performIncrementalBackup() {
log.info("开始执行增量备份...");
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
String backupDir = backupBaseDir + "/inc_backup_" + timestamp;
try {
// 1. 创建备份目录
Files.createDirectories(Paths.get(backupDir));
// 2. 获取上次备份的位点
String lastBackupLsn = getLastBackupLsn();
// 3. 执行增量备份
backupIncrementalData(backupDir, lastBackupLsn);
// 4. 记录本次备份位点
recordBackupLsn(backupDir, getCurrentLsn());
log.info("增量备份完成: {}", backupDir);
return true;
} catch (Exception e) {
log.error("增量备份失败", e);
return false;
}
}
/**
* 数据恢复
*/
public boolean recoverFromBackup(String backupPath, LocalDateTime targetTime) {
log.info("开始数据恢复, 备份路径: {}, 目标时间: {}", backupPath, targetTime);
try {
// 1. 停止应用写入
stopApplicationWrites();
// 2. 恢复全量备份
if (!restoreFullBackup(backupPath)) {
throw new RuntimeException("全量备份恢复失败");
}
// 3. 应用增量备份(如果指定了时间点恢复)
if (targetTime != null) {
applyIncrementalBackups(backupPath, targetTime);
}
// 4. 验证数据一致性
if (!verifyDataConsistency()) {
throw new RuntimeException("数据一致性验证失败");
}
// 5. 恢复应用写入
resumeApplicationWrites();
log.info("数据恢复完成");
return true;
} catch (Exception e) {
log.error("数据恢复失败", e);
// 紧急恢复措施
emergencyRecovery();
return false;
}
}
private List<String> getAllUserTables() {
String sql = "SELECT table_name FROM information_schema.tables " +
"WHERE table_schema = DATABASE() AND table_type = 'BASE TABLE'";
return jdbcTemplate.queryForList(sql, String.class);
}
private CompletableFuture<Boolean> backupTableAsync(String tableName, String backupDir) {
return CompletableFuture.supplyAsync(() -> {
try {
String backupFile = backupDir + "/" + tableName + ".sql";
// 使用mysqldump或其他工具备份单表
ProcessBuilder pb = new ProcessBuilder(
"mysqldump",
"--single-transaction",
"--quick",
databaseName,
tableName
);
Process process = pb.start();
try (InputStream input = process.getInputStream();
FileOutputStream output = new FileOutputStream(backupFile)) {
IOUtils.copy(input, output);
}
int exitCode = process.waitFor();
return exitCode == 0;
} catch (Exception e) {
log.error("备份表 {} 失败", tableName, e);
return false;
}
});
}
private void backupMetadata(String backupDir) {
// 备份数据库元数据
}
private void cleanupExpiredBackups() {
// 清理过期备份文件
}
private void stopApplicationWrites() {
// 停止应用写入逻辑
}
private void resumeApplicationWrites() {
// 恢复应用写入逻辑
}
private boolean restoreFullBackup(String backupPath) {
// 恢复全量备份逻辑
return true;
}
private void applyIncrementalBackups(String backupPath, LocalDateTime targetTime) {
// 应用增量备份逻辑
}
private boolean verifyDataConsistency() {
// 验证数据一致性逻辑
return true;
}
private void emergencyRecovery() {
// 紧急恢复逻辑
}
private String getLastBackupLsn() {
// 获取上次备份的LSN
return null;
}
private String getCurrentLsn() {
// 获取当前LSN
return null;
}
private void recordBackupLsn(String backupDir, String lsn) {
// 记录备份LSN
}
private void backupIncrementalData(String backupDir, String lastBackupLsn) {
// 执行增量备份
}
}
```
### 监控与告警系统
```java
// 数据库监控服务
@Service
@Slf4j
public class DatabaseMonitorService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private AlertService alertService;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
/**
* 启动数据库监控
*/
@PostConstruct
public void startMonitoring() {
// 监控连接数
scheduler.scheduleAtFixedRate(this::monitorConnections, 0, 30, TimeUnit.SECONDS);
// 监控性能指标
scheduler.scheduleAtFixedRate(this::monitorPerformance, 0, 60, TimeUnit.SECONDS);
// 监控复制状态
scheduler.scheduleAtFixedRate(this::monitorReplication, 0, 30, TimeUnit.SECONDS);
// 监控磁盘空间
scheduler.scheduleAtFixedRate(this::monitorDiskSpace, 0, 300, TimeUnit.SECONDS);
log.info("数据库监控服务已启动");
}
/**
* 监控数据库连接数
*/
private void monitorConnections() {
try {
String sql = "SHOW STATUS LIKE 'Threads_connected'";
Map<String, Object> result = jdbcTemplate.queryForMap(sql);
int connections = Integer.parseInt(result.get("Value").toString());
if (connections > 500) { // 阈值可配置
String alertMsg = String.format("数据库连接数过高: %d", connections);
alertService.sendAlert("DB_CONNECTIONS_HIGH", alertMsg);
}
// 监控连接池使用情况
monitorConnectionPool();
} catch (Exception e) {
log.error("监控连接数失败", e);
}
}
/**
* 监控性能指标
*/
private void monitorPerformance() {
try {
// 监控慢查询
monitorSlowQueries();
// 监控锁等待
monitorLockWaits();
// 监控缓冲池命中率
monitorBufferPoolHitRate();
// 监控临时表创建
monitorTempTables();
} catch (Exception e) {
log.error("监控性能指标失败", e);
}
}
/**
* 监控慢查询
*/
private void monitorSlowQueries() {
String sql = """
SELECT
db,
user_host,
query_time,
lock_time,
rows_sent,
rows_examined,
sql_text
FROM mysql.slow_log
WHERE start_time > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
ORDER BY query_time DESC
LIMIT 10
""";
jdbcTemplate.query(sql, (rs, rowNum) -> {
double queryTime = rs.getDouble("query_time");
long rowsExamined = rs.getLong("rows_examined");
String sqlText = rs.getString("sql_text");
if (queryTime > 10.0) { // 10秒以上慢查询
String alertMsg = String.format(
"发现严重慢查询: 执行时间 %.2fs, 检查行数: %d, SQL: %s",
queryTime, rowsExamined, sqlText
);
alertService.sendAlert("SLOW_QUERY", alertMsg);
}
return null;
});
}
/**
* 监控复制状态
*/
private void monitorReplication() {
try {
String sql = "SHOW SLAVE STATUS";
Map<String, Object> slaveStatus = jdbcTemplate.queryForMap(sql);
if (slaveStatus != null && !slaveStatus.isEmpty()) {
String ioRunning = (String) slaveStatus.get("Slave_IO_Running");
String sqlRunning = (String) slaveStatus.get("Slave_SQL_Running");
long secondsBehind = Long.parseLong(slaveStatus.get("Seconds_Behind_Master").toString());
// 检查复制线程状态
if (!"Yes".equals(ioRunning) || !"Yes".equals(sqlRunning)) {
String alertMsg = String.format(
"复制线程异常: IO_Running=%s, SQL_Running=%s",
ioRunning, sqlRunning
);
alertService.sendAlert("REPLICATION_ERROR", alertMsg);
}
// 检查复制延迟
if (secondsBehind > 60) { // 延迟超过60秒
String alertMsg = String.format("复制延迟: %d 秒", secondsBehind);
alertService.sendAlert("REPLICATION_LAG", alertMsg);
}
}
} catch (Exception e) {
log.error("监控复制状态失败", e);
}
}
/**
* 监控磁盘空间
*/
private void monitorDiskSpace() {
try {
String sql = """
SELECT
table_schema,
table_name,
data_length,
index_length,
data_free
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema')
ORDER BY (data_length + index_length) DESC
LIMIT 20
""";
jdbcTemplate.query(sql, (rs, rowNum) -> {
String tableSchema = rs.getString("table_schema");
String tableName = rs.getString("table_name");
long dataLength = rs.getLong("data_length");
long indexLength = rs.getLong("index_length");
long totalSize = dataLength + indexLength;
if (totalSize > 10L * 1024 * 1024 * 1024) { // 超过10GB的表
String alertMsg = String.format(
"大表告警: %s.%s 大小: %.2f GB",
tableSchema, tableName, totalSize / (1024.0 * 1024 * 1024)
);
alertService.sendAlert("LARGE_TABLE", alertMsg);
}
return null;
});
} catch (Exception e) {
log.error("监控磁盘空间失败", e);
}
}
private void monitorConnectionPool() {
// 监控连接池使用情况
}
private void monitorLockWaits() {
// 监控锁等待
}
private void monitorBufferPoolHitRate() {
// 监控缓冲池命中率
}
private void monitorTempTables() {
// 监控临时表创建
}
}
```
## 总结
本文详细介绍了百亿 PV 系统中数据倾斜问题的深度解析与解决方案,以及完整的灾备方案设计。通过实际代码演示了:
1. **数据倾斜检测与监控**:实时监控系统能够及时发现数据倾斜问题
2. **数据倾斜处理策略**:包括分桶、缓存、读写分离等多种解决方案
3. **多活数据中心架构**:确保系统的高可用性和数据一致性
4. **自动故障切换**:实现快速的故障检测和自动切换
5. **备份与恢复**:完善的数据备份策略和快速恢复机制
6. **全面监控告警**:对数据库各项指标进行实时监控
这些方案经过大型互联网公司的实际验证,能够有效应对百亿 PV 级别的流量压力,保证系统的稳定性和数据的安全性。DBA 专家掌握这些技能,能够在面对真实生产环境中的挑战时游刃有余。
有疑问加站长微信联系(非本文作者))
