获课地址:666it.top/13971/
# DBA 专家进阶必学!老男孩百亿 PV 实战班(二期):解决数据倾斜 + 灾备方案设计
在当今数据驱动的时代,数据库管理员(DBA)面临着前所未有的挑战。随着业务规模不断扩大,百亿级别的 PV(页面浏览量)对数据库系统提出了极高的要求。《老男孩百亿 PV 实战班(二期)》聚焦于解决生产环境中的核心难题:数据倾斜和灾备方案设计,帮助 DBA 从业者实现从合格到专家的跨越。
## 数据倾斜:分布式数据库的"隐形杀手"
### 数据倾斜的成因与危害
数据倾斜是分布式系统中常见的问题,当数据分布不均匀时,会导致某些节点负载过高,形成系统瓶颈。典型的倾斜场景包括:
- 热点数据频繁访问
- 哈希分片键选择不当
- 时间序列数据的天然倾斜
- 业务特征导致的数据分布不均
```sql
-- 创建测试表和数据,模拟数据倾斜场景
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior_type INT,
city VARCHAR(50),
province VARCHAR(50),
timestamp DATETIME,
INDEX idx_user (user_id),
INDEX idx_item (item_id),
INDEX idx_city (city)
) ENGINE=InnoDB PARTITION BY HASH(user_id % 10) PARTITIONS 10;
-- 插入模拟数据,制造热点用户
DELIMITER $$
CREATE PROCEDURE GenerateUserBehaviorData()
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE j INT DEFAULT 0;
DECLARE current_user BIGINT;
DECLARE hot_user_count INT DEFAULT 100; -- 热点用户数量
-- 插入热点用户数据(占总量1%,但访问量占90%)
WHILE i < hot_user_count DO
SET current_user = 1000000 + i; -- 热点用户ID范围
SET j = 0;
WHILE j < 10000 DO -- 每个热点用户1万条记录
INSERT INTO user_behavior VALUES (
current_user,
FLOOR(RAND() * 1000000),
FLOOR(RAND() * 5) + 1,
ELT(FLOOR(RAND() * 10) + 1, '北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '南京', '西安', '重庆'),
ELT(FLOOR(RAND() * 5) + 1, '华北', '华东', '华南', '华中', '西部'),
DATE_ADD('2024-01-01', INTERVAL FLOOR(RAND() * 365) DAY)
);
SET j = j + 1;
END WHILE;
SET i = i + 1;
END WHILE;
-- 插入普通用户数据
SET i = 0;
WHILE i < 100000 DO -- 10万普通用户
SET current_user = 2000000 + i;
INSERT INTO user_behavior VALUES (
current_user,
FLOOR(RAND() * 1000000),
FLOOR(RAND() * 5) + 1,
ELT(FLOOR(RAND() * 10) + 1, '北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '南京', '西安', '重庆'),
ELT(FLOOR(RAND() * 5) + 1, '华北', '华东', '华南', '华中', '西部'),
DATE_ADD('2024-01-01', INTERVAL FLOOR(RAND() * 365) DAY)
);
SET i = i + 1;
END WHILE;
END$$
DELIMITER ;
CALL GenerateUserBehaviorData();
-- 分析数据倾斜情况
SELECT
PARTITION_NAME,
TABLE_ROWS as row_count,
ROUND(TABLE_ROWS * 100.0 / (SELECT COUNT(*) FROM user_behavior), 2) as percentage
FROM information_schema.PARTITIONS
WHERE TABLE_NAME = 'user_behavior'
ORDER BY TABLE_ROWS DESC;
```
### 数据倾斜检测与诊断
```python
#!/usr/bin/env python3
"""
数据倾斜检测工具
适用于MySQL/Redis/ClickHouse等分布式数据库
"""
import pandas as pd
import numpy as np
import pymysql
import redis
from typing import Dict, List, Tuple
import json
import logging
from datetime import datetime, timedelta
class DataSkewDetector:
"""数据倾斜检测器"""
def __init__(self, db_config: Dict):
self.db_config = db_config
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置日志"""
logger = logging.getLogger('DataSkewDetector')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def detect_mysql_table_skew(self, table_name: str, shard_key: str = None) -> Dict:
"""
检测MySQL表数据倾斜
"""
try:
conn = pymysql.connect(**self.db_config)
# 检测表大小分布
size_query = f"""
SELECT
TABLE_NAME,
TABLE_ROWS,
DATA_LENGTH,
INDEX_LENGTH,
ROUND(TABLE_ROWS * 100.0 / (SELECT SUM(TABLE_ROWS)
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()), 2) as row_percentage,
ROUND(DATA_LENGTH * 100.0 / (SELECT SUM(DATA_LENGTH)
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()), 2) as size_percentage
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = '{table_name}'
"""
# 检测分区数据分布(如果表有分区)
partition_query = f"""
SELECT
PARTITION_NAME,
TABLE_ROWS,
ROUND(TABLE_ROWS * 100.0 / (SELECT SUM(TABLE_ROWS)
FROM information_schema.PARTITIONS
WHERE TABLE_NAME = '{table_name}'
AND TABLE_SCHEMA = DATABASE()), 2) as percentage
FROM information_schema.PARTITIONS
WHERE TABLE_NAME = '{table_name}'
AND TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_ROWS DESC
"""
with conn.cursor() as cursor:
cursor.execute(size_query)
table_stats = cursor.fetchall()
cursor.execute(partition_query)
partition_stats = cursor.fetchall()
# 计算倾斜度指标
skew_metrics = self._calculate_skew_metrics(partition_stats)
result = {
'table_name': table_name,
'table_statistics': table_stats,
'partition_distribution': partition_stats,
'skew_metrics': skew_metrics,
'detection_time': datetime.now().isoformat(),
'recommendations': self._generate_recommendations(skew_metrics)
}
self.logger.info(f"表 {table_name} 倾斜检测完成,倾斜度: {skew_metrics['skew_ratio']:.2f}")
return result
except Exception as e:
self.logger.error(f"检测表 {table_name} 倾斜时出错: {str(e)}")
raise
def _calculate_skew_metrics(self, partition_data: List[Tuple]) -> Dict:
"""计算倾斜度指标"""
if not partition_data:
return {}
rows = [row[1] for row in partition_data if row[1] is not None]
total_rows = sum(rows)
if total_rows == 0:
return {}
# 计算标准差和变异系数
mean = total_rows / len(rows)
std_dev = np.std(rows)
cv = std_dev / mean if mean > 0 else 0
# 计算最大最小比例
max_rows = max(rows)
min_rows = min(rows)
max_min_ratio = max_rows / min_rows if min_rows > 0 else float('inf')
# 计算80/20比例
sorted_rows = sorted(rows, reverse=True)
top_20_percent = sum(sorted_rows[:int(len(rows) * 0.2)])
bottom_80_percent = sum(sorted_rows[int(len(rows) * 0.2):])
eighty_twenty_ratio = top_20_percent / bottom_80_percent if bottom_80_percent > 0 else float('inf')
return {
'total_partitions': len(rows),
'total_rows': total_rows,
'mean_rows_per_partition': mean,
'std_deviation': std_dev,
'coefficient_of_variation': cv,
'max_min_ratio': max_min_ratio,
'eighty_twenty_ratio': eighty_twenty_ratio,
'skew_ratio': cv * 100, # 倾斜度百分比
'max_partition_rows': max_rows,
'min_partition_rows': min_rows
}
def _generate_recommendations(self, skew_metrics: Dict) -> List[str]:
"""生成优化建议"""
recommendations = []
skew_ratio = skew_metrics.get('skew_ratio', 0)
if skew_ratio > 50:
recommendations.append("⚠️ 严重数据倾斜,建议重新设计分片策略")
recommendations.append("🔧 考虑使用复合分片键或范围分片")
recommendations.append("📊 分析热点数据特征,优化数据分布")
elif skew_ratio > 20:
recommendations.append("⚠️ 中度数据倾斜,建议监控并优化")
recommendations.append("🔧 考虑数据重分布或动态分片")
else:
recommendations.append("✅ 数据分布相对均衡")
if skew_metrics.get('max_min_ratio', 0) > 10:
recommendations.append("📈 最大最小分区差异过大,建议平衡数据")
return recommendations
def detect_redis_cluster_skew(self, redis_config: Dict) -> Dict:
"""
检测Redis集群数据倾斜
"""
try:
redis_client = redis.Redis(**redis_config)
cluster_info = redis_client.info('keyspace')
# 分析各个节点的key分布
key_distribution = {}
total_keys = 0
for db, info in cluster_info.items():
if db.startswith('db'):
keys = int(info['keys'])
key_distribution[db] = keys
total_keys += keys
# 计算倾斜指标
if key_distribution:
keys_list = list(key_distribution.values())
skew_metrics = self._calculate_skew_metrics(
[(k, v) for k, v in key_distribution.items()]
)
result = {
'key_distribution': key_distribution,
'total_keys': total_keys,
'skew_metrics': skew_metrics,
'detection_time': datetime.now().isoformat()
}
return result
except Exception as e:
self.logger.error(f"检测Redis集群倾斜时出错: {str(e)}")
raise
# 使用示例
if __name__ == "__main__":
# MySQL配置
mysql_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4'
}
detector = DataSkewDetector(mysql_config)
# 检测表倾斜
result = detector.detect_mysql_table_skew('user_behavior')
print(json.dumps(result, indent=2, ensure_ascii=False))
```
### 数据倾斜解决方案
#### 1. 分片策略优化
```sql
-- 优化分片策略:使用复合分片键
CREATE TABLE user_behavior_optimized (
user_id BIGINT,
item_id BIGINT,
behavior_type INT,
city VARCHAR(50),
province VARCHAR(50),
timestamp DATETIME,
-- 添加复合分片键
shard_key VARCHAR(100) GENERATED ALWAYS AS (CONCAT(user_id, '_', city)),
INDEX idx_user (user_id),
INDEX idx_item (item_id),
INDEX idx_city (city),
INDEX idx_shard (shard_key)
) ENGINE=InnoDB
PARTITION BY KEY(shard_key)
PARTITIONS 20;
-- 范围分片:按时间分片
CREATE TABLE user_behavior_range (
user_id BIGINT,
item_id BIGINT,
behavior_type INT,
city VARCHAR(50),
province VARCHAR(50),
timestamp DATETIME,
INDEX idx_user (user_id),
INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB
PARTITION BY RANGE (YEAR(timestamp) * 100 + MONTH(timestamp)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p202404 VALUES LESS THAN (202405),
PARTITION p202405 VALUES LESS THAN (202406),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
```
#### 2. 热点数据分散
```python
#!/usr/bin/env python3
"""
热点数据分散解决方案
"""
import hashlib
import time
from typing import Any, List
import threading
from concurrent.futures import ThreadPoolExecutor
class HotspotDistributor:
"""热点数据分散器"""
def __init__(self, virtual_nodes: int = 1000):
self.virtual_nodes = virtual_nodes
self.node_weights = {}
def consistent_hash(self, key: str, nodes: List[str]) -> str:
"""一致性哈希算法"""
if not nodes:
raise ValueError("节点列表不能为空")
hash_value = self._hash_function(key)
node_index = hash_value % len(nodes)
return nodes[node_index]
def virtual_node_hash(self, key: str, physical_nodes: List[str]) -> str:
"""虚拟节点哈希"""
virtual_to_physical = {}
# 为每个物理节点创建虚拟节点
for physical_node in physical_nodes:
for i in range(self.virtual_nodes):
virtual_node = f"{physical_node}_vnode_{i}"
virtual_hash = self._hash_function(virtual_node)
virtual_to_physical[virtual_hash] = physical_node
# 找到最近的虚拟节点
key_hash = self._hash_function(key)
sorted_hashes = sorted(virtual_to_physical.keys())
for node_hash in sorted_hashes:
if node_hash >= key_hash:
return virtual_to_physical[node_hash]
# 环回第一个节点
return virtual_to_physical[sorted_hashes[0]]
def _hash_function(self, key: str) -> int:
"""哈希函数"""
return int(hashlib.md5(key.encode()).hexdigest()[:8], 16)
def distribute_hot_keys(self, hot_keys: List[str], nodes: List[str]) -> Dict[str, str]:
"""分散热点key"""
distribution = {}
for key in hot_keys:
# 为热点key添加随机后缀
salted_key = f"{key}_{self._hash_function(key) % 1000}"
target_node = self.virtual_node_hash(salted_key, nodes)
distribution[key] = target_node
return distribution
class ReadWriteSplitting:
"""读写分离解决方案"""
def __init__(self, master_config: Dict, slave_configs: List[Dict]):
self.master = self._create_connection(master_config)
self.slaves = [self._create_connection(config) for config in slave_configs]
self.slave_round_robin = 0
self.lock = threading.Lock()
def _create_connection(self, config: Dict):
"""创建数据库连接"""
# 实际项目中这里会创建真实的数据库连接
return config
def get_read_connection(self):
"""获取读连接(负载均衡)"""
with self.lock:
slave = self.slaves[self.slave_round_robin]
self.slave_round_robin = (self.slave_round_robin + 1) % len(self.slaves)
return slave
def execute_write(self, query: str, params: Tuple = None):
"""执行写操作"""
# 实际项目中这里会执行真实的写操作
print(f"Write to master: {query}")
return True
def execute_read(self, query: str, params: Tuple = None):
"""执行读操作"""
slave = self.get_read_connection()
# 实际项目中这里会执行真实的读操作
print(f"Read from slave {slave['host']}: {query}")
return []
# 使用示例
if __name__ == "__main__":
# 热点数据分散示例
distributor = HotspotDistributor()
nodes = ["node1", "node2", "node3", "node4", "node5"]
hot_keys = ["user_12345", "item_67890", "order_11111"]
distribution = distributor.distribute_hot_keys(hot_keys, nodes)
print("热点Key分布:", distribution)
```
## 灾备方案设计:保障业务连续性
### 多活数据中心架构
```python
#!/usr/bin/env python3
"""
多活数据中心灾备方案
"""
import time
import json
import logging
from enum import Enum
from typing import Dict, List, Optional
from abc import ABC, abstractmethod
import threading
from dataclasses import dataclass
class DataCenterStatus(Enum):
ACTIVE = "active"
STANDBY = "standby"
FAILOVER = "failover"
DEGRADED = "degraded"
OFFLINE = "offline"
@dataclass
class DataCenter:
id: str
name: str
region: str
status: DataCenterStatus
priority: int
endpoints: Dict[str, str]
health_check_url: str
class DisasterRecoveryManager:
"""灾备管理器"""
def __init__(self, data_centers: List[DataCenter]):
self.data_centers = {dc.id: dc for dc in data_centers}
self.active_dc = None
self.standby_dcs = []
self.health_check_interval = 30 # 秒
self.failover_threshold = 3 # 连续失败次数阈值
self.health_stats = {}
self.logger = self._setup_logger()
self._initialize_dc_roles()
def _setup_logger(self):
"""设置日志"""
logger = logging.getLogger('DisasterRecoveryManager')
logger.setLevel(logging.INFO)
return logger
def _initialize_dc_roles(self):
"""初始化数据中心角色"""
# 按优先级排序
sorted_dcs = sorted(self.data_centers.values(),
key=lambda x: x.priority)
if sorted_dcs:
self.active_dc = sorted_dcs[0]
self.standby_dcs = sorted_dcs[1:]
# 初始化健康状态
for dc in self.data_centers.values():
self.health_stats[dc.id] = {
'consecutive_failures': 0,
'last_check': None,
'response_time': None,
'overall_health': 100
}
def start_health_monitoring(self):
"""启动健康监控"""
def monitor_loop():
while True:
self._perform_health_checks()
time.sleep(self.health_check_interval)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
self.logger.info("健康监控已启动")
def _perform_health_checks(self):
"""执行健康检查"""
for dc in self.data_centers.values():
health = self._check_dc_health(dc)
if not health['healthy']:
self.health_stats[dc.id]['consecutive_failures'] += 1
self.logger.warning(f"数据中心 {dc.name} 健康检查失败: {health['error']}")
# 检查是否需要故障转移
if (self.health_stats[dc.id]['consecutive_failures'] >= self.failover_threshold and
dc.id == self.active_dc.id):
self._initiate_failover(dc)
else:
self.health_stats[dc.id]['consecutive_failures'] = 0
self.health_stats[dc.id]['response_time'] = health['response_time']
self.health_stats[dc.id]['overall_health'] = health['health_score']
def _check_dc_health(self, dc: DataCenter) -> Dict:
"""检查数据中心健康状态"""
# 模拟健康检查
# 实际项目中这里会检查数据库连接、网络延迟、服务状态等
try:
start_time = time.time()
# 模拟网络延迟
time.sleep(0.1)
response_time = (time.time() - start_time) * 1000 # 毫秒
# 模拟健康评分计算
health_score = max(0, 100 - response_time / 10)
return {
'healthy': health_score > 80,
'response_time': response_time,
'health_score': health_score,
'error': None if health_score > 80 else "响应时间过长"
}
except Exception as e:
return {
'healthy': False,
'response_time': None,
'health_score': 0,
'error': str(e)
}
def _initiate_failover(self, failed_dc: DataCenter):
"""启动故障转移"""
self.logger.info(f"启动故障转移,失败的数据中心: {failed_dc.name}")
if not self.standby_dcs:
self.logger.error("没有可用的备用数据中心!")
return
# 选择优先级最高的备用数据中心
new_active = self.standby_dcs[0]
# 执行故障转移
self._execute_failover(failed_dc, new_active)
# 更新角色
self.active_dc = new_active
self.standby_dcs = [dc for dc in self.standby_dcs if dc.id != new_active.id]
self.standby_dcs.append(failed_dc)
self.logger.info(f"故障转移完成,新的主数据中心: {new_active.name}")
def _execute_failover(self, from_dc: DataCenter, to_dc: DataCenter):
"""执行故障转移操作"""
# 实际项目中这里会包括:
# 1. 停止到故障中心的流量
# 2. 数据同步验证
# 3. 切换到新中心
# 4. 更新DNS/负载均衡配置
self.logger.info(f"从 {from_dc.name} 切换到 {to_dc.name}")
# 模拟切换过程
steps = [
"停止故障中心流量",
"验证数据一致性",
"切换数据库连接",
"更新路由配置",
"验证服务可用性"
]
for step in steps:
self.logger.info(f"执行步骤: {step}")
time.sleep(0.5) # 模拟操作时间
def get_current_topology(self) -> Dict:
"""获取当前拓扑状态"""
return {
'active_dc': {
'id': self.active_dc.id,
'name': self.active_dc.name,
'status': self.active_dc.status.value
},
'standby_dcs': [
{
'id': dc.id,
'name': dc.name,
'status': dc.status.value,
'priority': dc.priority
} for dc in self.standby_dcs
],
'health_stats': self.health_stats,
'last_updated': time.time()
}
# 数据备份与恢复策略
class BackupStrategy:
"""备份策略基类"""
def __init__(self, retention_days: int = 30):
self.retention_days = retention_days
@abstractmethod
def perform_backup(self, database: str, backup_path: str) -> bool:
"""执行备份"""
pass
@abstractmethod
def perform_restore(self, backup_file: str, target_database: str) -> bool:
"""执行恢复"""
pass
class MySQLBackupStrategy(BackupStrategy):
"""MySQL备份策略"""
def perform_backup(self, database: str, backup_path: str) -> bool:
"""执行MySQL备份"""
try:
# 实际项目中会使用mysqldump或XtraBackup
backup_file = f"{backup_path}/{database}_{time.strftime('%Y%m%d_%H%M%S')}.sql"
# 模拟备份过程
self._execute_mysqldump(database, backup_file)
self._verify_backup(backup_file)
self._cleanup_old_backups(backup_path)
return True
except Exception as e:
logging.error(f"备份失败: {str(e)}")
return False
def perform_restore(self, backup_file: str, target_database: str) -> bool:
"""执行MySQL恢复"""
try:
# 模拟恢复过程
self._create_database_if_not_exists(target_database)
self._execute_mysql_restore(backup_file, target_database)
self._verify_restore(target_database)
return True
except Exception as e:
logging.error(f"恢复失败: {str(e)}")
return False
def _execute_mysqldump(self, database: str, output_file: str):
"""执行mysqldump命令"""
# 实际命令: mysqldump -u user -p database > output_file
logging.info(f"执行mysqldump: {database} -> {output_file}")
time.sleep(1) # 模拟备份时间
def _execute_mysql_restore(self, backup_file: str, database: str):
"""执行mysql恢复命令"""
# 实际命令: mysql -u user -p database < backup_file
logging.info(f"执行mysql恢复: {backup_file} -> {database}")
time.sleep(2) # 模拟恢复时间
def _verify_backup(self, backup_file: str):
"""验证备份文件"""
logging.info(f"验证备份文件: {backup_file}")
def _verify_restore(self, database: str):
"""验证恢复结果"""
logging.info(f"验证数据库恢复: {database}")
def _create_database_if_not_exists(self, database: str):
"""创建数据库(如果不存在)"""
logging.info(f"检查/创建数据库: {database}")
def _cleanup_old_backups(self, backup_path: str):
"""清理旧备份"""
logging.info(f"清理 {backup_path} 中的旧备份")
# 使用示例
if __name__ == "__main__":
# 创建数据中心配置
data_centers = [
DataCenter(
id="dc1",
name="北京主数据中心",
region="cn-north-1",
status=DataCenterStatus.ACTIVE,
priority=1,
endpoints={
"mysql": "mysql-dc1.internal.com:3306",
"redis": "redis-dc1.internal.com:6379",
"api": "api-dc1.internal.com:8080"
},
health_check_url="http://health-dc1.internal.com/status"
),
DataCenter(
id="dc2",
name="上海备用数据中心",
region="cn-east-1",
status=DataCenterStatus.STANDBY,
priority=2,
endpoints={
"mysql": "mysql-dc2.internal.com:3306",
"redis": "redis-dc2.internal.com:6379",
"api": "api-dc2.internal.com:8080"
},
health_check_url="http://health-dc2.internal.com/status"
),
DataCenter(
id="dc3",
name="广州灾备数据中心",
region="cn-south-1",
status=DataCenterStatus.STANDBY,
priority=3,
endpoints={
"mysql": "mysql-dc3.internal.com:3306",
"redis": "redis-dc3.internal.com:6379",
"api": "api-dc3.internal.com:8080"
},
health_check_url="http://health-dc3.internal.com/status"
)
]
# 初始化灾备管理器
dr_manager = DisasterRecoveryManager(data_centers)
dr_manager.start_health_monitoring()
# 获取当前状态
topology = dr_manager.get_current_topology()
print("当前灾备拓扑:")
print(json.dumps(topology, indent=2, ensure_ascii=False))
# 备份示例
backup_strategy = MySQLBackupStrategy()
backup_strategy.perform_backup("user_db", "/backup/mysql")
```
### 数据库监控与告警
```sql
-- 创建监控表
CREATE TABLE database_monitoring (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
metric_name VARCHAR(100) NOT NULL,
metric_value DECIMAL(15,4),
metric_unit VARCHAR(50),
instance_host VARCHAR(100),
instance_port INT,
database_name VARCHAR(100),
collect_time DATETIME DEFAULT CURRENT_TIMESTAMP,
severity ENUM('INFO', 'WARNING', 'CRITICAL') DEFAULT 'INFO',
tags JSON,
INDEX idx_metric_time (metric_name, collect_time),
INDEX idx_severity_time (severity, collect_time)
);
-- 插入监控数据的存储过程
DELIMITER $$
CREATE PROCEDURE CollectDatabaseMetrics()
BEGIN
DECLARE current_time DATETIME DEFAULT NOW();
-- 收集连接数指标
INSERT INTO database_monitoring
(metric_name, metric_value, metric_unit, instance_host, database_name)
SELECT
'threads_connected' as metric_name,
VARIABLE_VALUE as metric_value,
'connections' as metric_unit,
@@hostname as instance_host,
DATABASE() as database_name
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'Threads_connected';
-- 收集QPS指标
INSERT INTO database_monitoring
(metric_name, metric_value, metric_unit, instance_host, database_name)
SELECT
'queries_per_second' as metric_name,
VARIABLE_VALUE as metric_value,
'queries/sec' as metric_unit,
@@hostname as instance_host,
DATABASE() as database_name
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'Queries';
-- 收集慢查询指标
INSERT INTO database_monitoring
(metric_name, metric_value, metric_unit, instance_host, database_name, severity)
SELECT
'slow_queries' as metric_name,
VARIABLE_VALUE as metric_value,
'queries' as metric_unit,
@@hostname as instance_host,
DATABASE() as database_name,
CASE WHEN VARIABLE_VALUE > 10 THEN 'WARNING'
WHEN VARIABLE_VALUE > 50 THEN 'CRITICAL'
ELSE 'INFO' END as severity
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'Slow_queries';
-- 收集锁等待指标
INSERT INTO database_monitoring
(metric_name, metric_value, metric_unit, instance_host, database_name, severity)
SELECT
'innodb_row_lock_waits' as metric_name,
VARIABLE_VALUE as metric_value,
'waits' as metric_unit,
@@hostname as instance_host,
DATABASE() as database_name,
CASE WHEN VARIABLE_VALUE > 100 THEN 'WARNING'
WHEN VARIABLE_VALUE > 500 THEN 'CRITICAL'
ELSE 'INFO' END as severity
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'Innodb_row_lock_waits';
END$$
DELIMITER ;
-- 创建告警规则表
CREATE TABLE alert_rules (
id INT AUTO_INCREMENT PRIMARY KEY,
rule_name VARCHAR(200) NOT NULL,
metric_name VARCHAR(100) NOT NULL,
condition_operator ENUM('>', '>=', '<', '<=', '=', '!='),
condition_value DECIMAL(15,4),
severity ENUM('INFO', 'WARNING', 'CRITICAL'),
enabled BOOLEAN DEFAULT TRUE,
check_interval INT DEFAULT 60, -- 秒
last_check_time DATETIME,
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_rule_metric (rule_name, metric_name)
);
-- 插入默认告警规则
INSERT INTO alert_rules
(rule_name, metric_name, condition_operator, condition_value, severity, check_interval)
VALUES
('高连接数告警', 'threads_connected', '>', 100, 'WARNING', 60),
('严重连接数告警', 'threads_connected', '>', 200, 'CRITICAL', 60),
('慢查询增多告警', 'slow_queries', '>', 50, 'WARNING', 300),
('严重锁等待告警', 'innodb_row_lock_waits', '>', 500, 'CRITICAL', 60);
```
## 实战案例:百亿PV电商系统优化
### 场景描述
某电商平台面临以下挑战:
- 日PV超过100亿
- 峰值QPS达到50万+
- 数据量PB级别
- 要求99.99%可用性
### 解决方案架构
```python
#!/usr/bin/env python3
"""
百亿PV电商系统数据库架构
"""
class ECommerceDatabaseArchitecture:
"""电商数据库架构"""
def __init__(self):
self.sharding_strategy = {}
self.cache_strategy = {}
self.backup_strategy = {}
def design_sharding_architecture(self):
"""设计分片架构"""
architecture = {
'user_data': {
'shard_key': 'user_id',
'shard_count': 1024,
'strategy': 'range_based',
'backup_interval': 'hourly'
},
'order_data': {
'shard_key': 'order_id',
'shard_count': 512,
'strategy': 'hash_based',
'backup_interval': 'real_time'
},
'product_data': {
'shard_key': 'product_id',
'shard_count': 256,
'strategy': 'consistent_hash',
'backup_interval': 'daily'
},
'inventory_data': {
'shard_key': 'sku_id',
'shard_count': 128,
'strategy': 'hotspot_aware',
'backup_interval': 'real_time'
}
}
return architecture
def implement_cache_strategy(self):
"""实现缓存策略"""
cache_layers = {
'L1': {
'type': 'local_cache',
'max_size': '10GB',
'ttl': '5分钟',
'eviction_policy': 'LRU'
},
'L2': {
'type': 'redis_cluster',
'nodes': 12,
'memory_per_node': '64GB',
'strategy': 'read_through'
},
'L3': {
'type': 'CDN_cache',
'coverage': '全球',
'strategy': 'edge_caching'
}
}
return cache_layers
def design_disaster_recovery(self):
"""设计灾备方案"""
dr_plan = {
'rto': '5分钟', # 恢复时间目标
'rpo': '30秒', # 恢复点目标
'backup_strategy': {
'full_backup': '每日凌晨2点',
'incremental_backup': '每小时',
'binlog_backup': '实时'
},
'replication_strategy': {
'sync_replication': '同城
有疑问加站长微信联系(非本文作者))
