后台回复:1024,获取500G视频教程 推荐阅读 转行成为数据分析师 牛逼,OpenAI新模型 o1 国内直接连! 《SQL145题第2版》正式发布!
大家好,我是岳哥。
这是《最强总结》系列第三篇,之前分享的还没阅读的可以看这篇:
最强总结!SQL Server/MySQL/Oracle函数完全指南!!
数据库优化是提升应用性能的关键环节。本文将从多个维度系统地介绍数据库优化的方法和实践经验。
这些总结都是此前整理好后保存的,最近集中发布,觉得有帮助,记得三连(点赞+转发+在看),岳哥才会更有动力继续发布。此外,大家也可以留言需要哪方面的总结。
一、SQL查询优化
二、数据库配置优化
三、表结构优化
四、分区和分表优化
五、缓存优化
六、监控和维护
七、安全优化
需要PDF版本的同学,可以在公众号后台回复:数据库优化
一、SQL查询优化
1. 索引优化
-- 1. 创建合适的索引
CREATE INDEX idx_user_email ON users(email);
CREATE INDEX idx_order_status_date ON orders(status, created_at);
-- 2. 联合索引优化
-- 不推荐:创建多个单列索引
CREATE INDEX idx_name ON users(name);
CREATE INDEX idx_email ON users(email);
CREATE INDEX idx_phone ON users(phone);
-- 推荐:创建合适的联合索引
CREATE INDEX idx_users_search ON users(name, email, phone);
-- 3. 索引排序优化
-- 考虑排序方向的索引
CREATE INDEX idx_products_category_price ON products(category ASC, price DESC);
-- 4. 前缀索引
-- 对于长字符串字段,使用前缀索引
CREATE INDEX idx_description ON products(description(50));
-- 5. 覆盖索引
-- 创建包含所有查询所需字段的索引
CREATE INDEX idx_orders_covering ON orders(order_id, status, created_at, total_amount);
-- 使用覆盖索引的查询
EXPLAIN SELECT order_id, status, created_at, total_amount
FROM orders
WHERE status = 'pending'
ORDER BY created_at DESC;
-- 6. 避免索引失效
-- 不推荐:使用函数导致索引失效
SELECT * FROM users WHERE YEAR(created_at) = 2024;
-- 推荐:改写为范围查询
SELECT * FROM users
WHERE created_at >= '2024-01-01'
AND created_at < '2025-01-01';
2. 查询重写
-- 1. 避免SELECT *
-- 不推荐
SELECT * FROM orders JOIN users ON orders.user_id = users.id;
-- 推荐
SELECT o.order_id, o.status, u.name, u.email
FROM orders o
JOIN users u ON o.user_id = u.id;
-- 2. 使用EXISTS替代IN
-- 不推荐
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE status = 'active');
-- 推荐
SELECT * FROM orders o
WHERE EXISTS (
SELECT 1 FROM users u
WHERE u.id = o.user_id AND u.status = 'active'
);
-- 3. 使用UNION ALL替代UNION
-- 不推荐
SELECT order_id FROM completed_orders
UNION
SELECT order_id FROM pending_orders;
-- 推荐
SELECT order_id FROM completed_orders
UNION ALL
SELECT order_id FROM pending_orders;
-- 4. 使用JOIN替代子查询
-- 不推荐
SELECT *,
(SELECT name FROM users WHERE users.id = orders.user_id) as user_name
FROM orders;
-- 推荐
SELECT o.*, u.name as user_name
FROM orders o
LEFT JOIN users u ON o.user_id = u.id;
-- 5. 分页优化
-- 不推荐
SELECT * FROM orders ORDER BY created_at DESC LIMIT 1000000, 10;
-- 推荐
SELECT * FROM orders
WHERE created_at < (
SELECT created_at FROM orders
ORDER BY created_at DESC
LIMIT 1 OFFSET 999999
)
ORDER BY created_at DESC
LIMIT 10;
-- 6. 使用临时表优化复杂查询
-- 创建临时表存储中间结果
CREATE TEMPORARY TABLE tmp_order_stats AS
SELECT user_id, COUNT(*) as order_count, SUM(amount) as total_amount
FROM orders
GROUP BY user_id;
-- 使用临时表
SELECT u.*, t.order_count, t.total_amount
FROM users u
LEFT JOIN tmp_order_stats t ON u.id = t.user_id;
二、数据库配置优化
1. 内存配置
# MySQL配置文件 my.cnf
# 1. InnoDB缓冲池配置
innodb_buffer_pool_size = 12G # 总内存的50-75%
innodb_buffer_pool_instances = 8 # CPU核心数相当
innodb_buffer_pool_chunk_size = 128M # 默认值,通常不需要修改
# 2. 查询缓存配置(MySQL 8.0已移除)
query_cache_type = 0 # 禁用查询缓存
query_cache_size = 0 # 设置为0
# 3. 排序缓冲区
sort_buffer_size = 2M # 每个会话的排序缓冲区
join_buffer_size = 2M # 每个会话的连接缓冲区
# 4. 临时表配置
tmp_table_size = 64M # 内存临时表大小
max_heap_table_size = 64M # 用户创建的内存表大小
# 5. 连接缓冲区
max_connections = 1000 # 最大连接数
thread_cache_size = 128 # 线程缓存大小
# 6. InnoDB日志配置
innodb_log_file_size = 1G # 重做日志文件大小
innodb_log_files_in_group = 2 # 重做日志文件组数
innodb_log_buffer_size = 16M # 日志缓冲区大小
# 7. 性能监控配置
performance_schema = ON
performance_schema_max_table_instances = 1000
# 查看当前内存使用情况的SQL
SELECT
event_name AS memory_type,
current_alloc AS current_bytes,
high_alloc AS max_bytes
FROM performance_schema.memory_summary_global_by_event_name
WHERE event_name LIKE '%memory%'
ORDER BY current_alloc DESC;
# 查看InnoDB缓冲池状态
SHOW ENGINE INNODB STATUS;
# 查看全局变量
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
# 检查缓冲池命中率
SELECT
(1 - (
SELECT variable_value
FROM performance_schema.global_status
WHERE variable_name = 'Innodb_buffer_pool_reads'
) / (
SELECT variable_value
FROM performance_schema.global_status
WHERE variable_name = 'Innodb_buffer_pool_read_requests'
)) * 100 as buffer_pool_hit_ratio;
2. 磁盘I/O优化
# MySQL配置文件 my.cnf
# 1. InnoDB文件I/O配置
innodb_file_per_table = ON # 每个表使用独立的表空间文件
innodb_flush_method = O_DIRECT # 使用直接I/O
innodb_io_capacity = 2000 # SSD磁盘I/O能力
innodb_io_capacity_max = 4000 # 最大I/O能力
# 2. 日志写入配置
innodb_flush_log_at_trx_commit = 1 # 事务提交时写入磁盘
sync_binlog = 1 # 每次事务同步二进制日志
# 3. 双写缓冲区配置
innodb_doublewrite = ON # 启用双写缓冲
innodb_doublewrite_batch_size = 32 # 批处理大小
# 4. 读写分离配置
innodb_read_io_threads = 8 # 读线程数
innodb_write_io_threads = 8 # 写线程数
# 5. 预读配置
innodb_read_ahead_threshold = 56 # 预读阈值
innodb_random_read_ahead = OFF # 随机预读
# 监控I/O性能的SQL查询
SELECT
event_name,
count_star as count,
sum_timer_wait/1000000000 as total_ms,
avg_timer_wait/1000000000 as avg_ms,
max_timer_wait/1000000000 as max_ms
FROM performance_schema.events_waits_summary_global_by_event_name
WHERE event_name LIKE '%io%'
ORDER BY sum_timer_wait DESC;
# 检查InnoDB I/O状态
SHOW ENGINE INNODB STATUS;
# 查看文件I/O情况
SELECT
file_name,
event_name,
count_read,
count_write,
sum_number_of_bytes_read,
sum_number_of_bytes_write
FROM performance_schema.file_summary_by_instance
WHERE event_name LIKE '%innodb%'
ORDER BY sum_number_of_bytes_read + sum_number_of_bytes_write DESC;
# 检查慢查询
SELECT
query,
exec_count,
total_latency,
avg_latency,
rows_sent_avg,
rows_examined_avg
FROM sys.statements_with_runtimes_in_95th_percentile;
3. 并发配置
# MySQL并发配置
# 1. 连接管理
max_connections = 1000 # 最大连接数
max_user_connections = 800 # 每个用户最大连接数
thread_cache_size = 128 # 线程缓存大小
wait_timeout = 600 # 非交互连接超时时间
interactive_timeout = 1800 # 交互连接超时时间
# 2. 锁配置
innodb_lock_wait_timeout = 50 # 锁等待超时时间
innodb_deadlock_detect = ON # 死锁检测
innodb_autoinc_lock_mode = 2 # 自增锁模式
# 3. 事务配置
transaction_isolation = REPEATABLE-READ # 事务隔离级别
innodb_rollback_segments = 128 # 回滚段数量
# 监控并发情况的SQL
# 检查当前连接数
SELECT COUNT(*) AS connections,
USER AS user,
HOST AS host
FROM INFORMATION_SCHEMA.PROCESSLIST
GROUP BY user, host;
# 检查锁等待情况
SELECT
r.trx_id waiting_trx_id,
r.trx_state waiting_trx_state,
b.trx_id blocking_trx_id,
b.trx_state blocking_trx_state,
TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) wait_time
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
# 检查事务状态
SELECT * FROM information_schema.innodb_trx
WHERE trx_state != 'RUNNING';
# 检查表锁情况
SHOW OPEN TABLES WHERE in_use > 0;
三、表结构优化
1. 数据类型优化
-- 1. 整数类型优化
-- 不推荐
CREATE TABLE products (
id INT, -- 范围过大
status TINYINT, -- 未声明是否有符号
quantity INT, -- 范围过大
category_id INT -- 范围过大
);
-- 推荐
CREATE TABLE products (
id MEDIUMINT UNSIGNED AUTO_INCREMENT, -- 适中范围
status TINYINT UNSIGNED, -- 明确无符号
quantity SMALLINT UNSIGNED, -- 适合范围
category_id SMALLINT UNSIGNED -- 适合范围
);
-- 2. 字符串类型优化
-- 不推荐
CREATE TABLE users (
id INT,
name VARCHAR(255), -- 长度过长
email VARCHAR(255), -- 长度过长
phone CHAR(20), -- 使用固定长度
description TEXT -- 未限制长度
);
-- 推荐
CREATE TABLE users (
id INT,
name VARCHAR(50), -- 适合长度
email VARCHAR(100), -- 适合长度
phone VARCHAR(20), -- 可变长度
description TEXT(16383) -- MEDIUMTEXT替代
);
-- 3. 时间类型优化
-- 不推荐
CREATE TABLE orders (
id INT,
created_at VARCHAR(30), -- 使用字符串存储时间
updated_at VARCHAR(30), -- 使用字符串存储时间
delivery_date DATETIME -- 不需要时间精度
);
-- 推荐
CREATE TABLE orders (
id INT,
created_at TIMESTAMP, -- 自动更新时间戳
updated_at TIMESTAMP, -- 自动更新时间戳
delivery_date DATE -- 只需要日期
);
-- 4. 小数类型优化
-- 不推荐
CREATE TABLE financial_records (
id INT,
amount FLOAT, -- 存在精度问题
price DOUBLE, -- 存在精度问题
tax_rate DECIMAL(10,2) -- 范围过大
);
-- 推荐
CREATE TABLE financial_records (
id INT,
amount DECIMAL(8,2), -- 精确定点数
price DECIMAL(8,2), -- 精确定点数
tax_rate DECIMAL(5,2) -- 适合范围
);
-- 5. 枚举类型优化
-- 不推荐
CREATE TABLE tickets (
id INT,
status VARCHAR(20), -- 使用字符串存储固定选项
priority VARCHAR(20) -- 使用字符串存储固定选项
);
-- 推荐
CREATE TABLE tickets (
id INT,
status ENUM('open', 'closed', 'pending'), -- 使用枚举
priority ENUM('low', 'medium', 'high') -- 使用枚举
);
-- 6. JSON类型优化(MySQL 5.7+)
-- 不推荐
CREATE TABLE product_attributes (
id INT,
attributes VARCHAR(4000) -- 使用字符串存储JSON
);
-- 推荐
CREATE TABLE product_attributes (
id INT,
attributes JSON, -- 使用JSON类型
INDEX idx_attr ((CAST(attributes->>'$.color' AS CHAR(20)))) -- JSON索引
);
2. 范式和反范式优化
-- 1. 范式设计示例
-- 第三范式设计
CREATE TABLE orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date DATE,
status VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE order_items (
order_item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10,2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
-- 2. 反范式设计示例
-- 为了查询效率,在orders表中冗余一些常用信息
CREATE TABLE orders_denormalized (
order_id INT PRIMARY KEY,
user_id INT,
user_name VARCHAR(50), -- 冗余用户名
user_email VARCHAR(100), -- 冗余用户邮箱
order_date DATE,
status VARCHAR(20),
total_amount DECIMAL(10,2), -- 冗余订单总额
total_items INT, -- 冗余商品总数
FOREIGN KEY (user_id) REFERENCES users(id)
);
-- 3. 混合设计示例
-- 保持基本范式,但添加汇总表
CREATE TABLE order_summaries (
order_id INT PRIMARY KEY,
total_amount DECIMAL(10,2),
total_items INT,
last_updated TIMESTAMP,
FOREIGN KEY (order_id) REFERENCES orders(order_id)
);
-- 创建触发器自动更新汇总信息
DELIMITER $$
CREATE TRIGGER after_order_item_insert
AFTER INSERT ON order_items
FOR EACH ROW
BEGIN
INSERT INTO order_summaries (order_id, total_amount, total_items)
SELECT
NEW.order_id,
SUM(quantity * unit_price),
SUM(quantity)
FROM order_items
WHERE order_id = NEW.order_id
ON DUPLICATE KEY UPDATE
total_amount = VALUES(total_amount),
total_items = VALUES(total_items),
last_updated = CURRENT_TIMESTAMP;
END $$
DELIMITER ;
-- 4. 垂直分表示例
-- 将大字段拆分到单独的表
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
category_id INT
);
CREATE TABLE product_details (
product_id INT PRIMARY KEY,
description TEXT,
specifications JSON,
images JSON,
FOREIGN KEY (product_id) REFERENCES products(id)
);
-- 5. 水平分表示例
-- 按时间范围分表
CREATE TABLE orders_2023 (
order_id INT PRIMARY KEY,
user_id INT,
order_date DATE,
CHECK (YEAR(order_date) = 2023)
);
CREATE TABLE orders_2024 (
order_id INT PRIMARY KEY,
user_id INT,
order_date DATE,
CHECK (YEAR(order_date) = 2024)
);
四、分区和分表优化
1. 分区策略
-- 1. 范围分区(RANGE)
CREATE TABLE orders (
order_id INT,
order_date DATE,
amount DECIMAL(10,2),
customer_id INT
)
PARTITION BY RANGE (YEAR(order_date)) (
PARTITION p2022 VALUES LESS THAN (2023),
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 2. 列表分区(LIST)
CREATE TABLE sales (
id INT,
amount DECIMAL(10,2),
region VARCHAR(20)
)
PARTITION BY LIST COLUMNS(region) (
PARTITION p_east VALUES IN ('east', 'northeast'),
PARTITION p_west VALUES IN ('west', 'northwest'),
PARTITION p_cent VALUES IN ('central'),
PARTITION p_south VALUES IN ('south', 'southeast')
);
-- 3. 哈希分区(HASH)
CREATE TABLE products (
id INT,
name VARCHAR(100),
created_at DATE
)
PARTITION BY HASH(id)
PARTITIONS 4;
-- 4. 复合分区策略
CREATE TABLE customer_orders (
order_id INT,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2)
)
PARTITION BY RANGE(YEAR(order_date))
SUBPARTITION BY HASH(customer_id)
SUBPARTITIONS 4 (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 5. 分区维护
-- 添加新分区
ALTER TABLE orders ADD PARTITION (
PARTITION p2025 VALUES LESS THAN (2026)
);
-- 删除分区
ALTER TABLE orders DROP PARTITION p2022;
-- 重组分区
ALTER TABLE orders REORGANIZE PARTITION p2023 INTO (
PARTITION p2023_h1 VALUES LESS THAN (2023-07-01),
PARTITION p2023_h2 VALUES LESS THAN (2024)
);
-- 6. 分区查询优化
-- 使用分区键进行查询
EXPLAIN SELECT *
FROM orders
WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31';
-- 分区信息查看
SELECT
PARTITION_NAME,
PARTITION_ORDINAL_POSITION,
TABLE_ROWS
FROM information_schema.PARTITIONS
WHERE TABLE_NAME = 'orders';
-- 7. 分区表维护
-- 检查分区
ALTER TABLE orders CHECK PARTITION p2023, p2024;
-- 优化分区
ALTER TABLE orders OPTIMIZE PARTITION p2023, p2024;
-- 分析分区
ALTER TABLE orders ANALYZE PARTITION p2023, p2024;
-- 重建分区
ALTER TABLE orders REBUILD PARTITION p2023, p2024;
2. 分表策略
-- 1. 水平分表策略
-- 按用户ID范围分表
CREATE TABLE orders_0000_0999 (
order_id BIGINT PRIMARY KEY,
user_id INT CHECK (user_id BETWEEN 0 AND 999),
order_date TIMESTAMP,
amount DECIMAL(10,2)
);
CREATE TABLE orders_1000_1999 (
order_id BIGINT PRIMARY KEY,
user_id INT CHECK (user_id BETWEEN 1000 AND 1999),
order_date TIMESTAMP,
amount DECIMAL(10,2)
);
-- 创建路由视图
CREATE VIEW orders_all AS
SELECT * FROM orders_0000_0999
UNION ALL
SELECT * FROM orders_1000_1999;
-- 2. 垂直分表策略
-- 基础信息表
CREATE TABLE products_base (
product_id INT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
category_id INT
);
-- 详细信息表
CREATE TABLE products_details (
product_id INT PRIMARY KEY,
description TEXT,
specifications JSON,
created_at TIMESTAMP,
updated_at TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products_base(product_id)
);
-- 3. 时间分表策略
-- 按月分表
CREATE TABLE orders_202401 (
order_id BIGINT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP CHECK (order_date >= '2024-01-01' AND order_date < '2024-02-01'),
amount DECIMAL(10,2)
);
CREATE TABLE orders_202402 (
order_id BIGINT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP CHECK (order_date >= '2024-02-01' AND order_date < '2024-03-01'),
amount DECIMAL(10,2)
);
-- 4. 分表管理存储过程
DELIMITER $$
CREATE PROCEDURE create_monthly_order_table(IN p_year INT, IN p_month INT)
BEGIN
SET @table_name = CONCAT('orders_', p_year, LPAD(p_month, 2, '0'));
SET @start_date = DATE_FORMAT(CONCAT(p_year, '-', p_month, '-01'), '%Y-%m-%d');
SET @end_date = DATE_FORMAT(DATE_ADD(@start_date, INTERVAL 1 MONTH), '%Y-%m-%d');
SET @sql = CONCAT(
'CREATE TABLE ', @table_name, ' (
order_id BIGINT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP CHECK (order_date >= ''', @start_date, ''' AND order_date < ''', @end_date, '''),
amount DECIMAL(10,2),
INDEX idx_user_id (user_id),
INDEX idx_order_date (order_date)
)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END $$
DELIMITER ;
-- 5. 分表查询优化
-- 创建分表汇总视图
CREATE VIEW orders_summary AS
SELECT 'orders_202401' as table_name, COUNT(*) as record_count,
MIN(order_date) as min_date, MAX(order_date) as max_date
FROM orders_202401
UNION ALL
SELECT 'orders_202402' as table_name, COUNT(*) as record_count,
MIN(order_date) as min_date, MAX(order_date) as max_date
FROM orders_202402;
-- 6. 分表数据迁移
-- 创建数据迁移存储过程
DELIMITER $$
CREATE PROCEDURE migrate_orders_data(
IN source_table VARCHAR(50),
IN target_table VARCHAR(50),
IN batch_size INT
)
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE batch_start BIGINT;
-- 获取最小ID
SET batch_start = (SELECT MIN(order_id) FROM source_table);
REPEAT
-- 批量迁移数据
INSERT INTO target_table
SELECT * FROM source_table
WHERE order_id >= batch_start
AND order_id < batch_start + batch_size;
-- 更新起始ID
SET batch_start = batch_start + batch_size;
-- 检查是否完成
SET done = NOT EXISTS(
SELECT 1 FROM source_table
WHERE order_id >= batch_start LIMIT 1
);
-- 提交事务并等待一小段时间
COMMIT;
DO SLEEP(0.1);
UNTIL done END REPEAT;
END $$
DELIMITER ;
五、缓存优化
1. 查询缓存
-- 1. MySQL查询缓存配置(适用于MySQL 5.7及以下版本)
SET GLOBAL query_cache_type = 1;
SET GLOBAL query_cache_size = 134217728; -- 128MB
-- 2. 应用层查询缓存实现
-- 创建缓存表
CREATE TABLE query_cache (
cache_key VARCHAR(255) PRIMARY KEY,
cache_value TEXT,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 创建缓存管理存储过程
DELIMITER $$
CREATE PROCEDURE cache_query(
IN p_cache_key VARCHAR(255),
IN p_query TEXT,
IN p_expire_minutes INT
)
BEGIN
DECLARE v_cache_value TEXT;
DECLARE v_is_expired BOOLEAN;
-- 检查缓存是否存在且未过期
SELECT
cache_value,
expires_at < NOW() INTO v_cache_value, v_is_expired
FROM query_cache
WHERE cache_key = p_cache_key;
-- 如果缓存不存在或已过期,执行查询并更新缓存
IF v_cache_value IS NULL OR v_is_expired THEN
SET @sql = p_query;
PREPARE stmt FROM @sql;
EXECUTE stmt;
-- 将结果存入缓存
-- 注意:这里假设查询结果可以转换为JSON格式
SET @result = (SELECT JSON_ARRAYAGG(JSON_OBJECT(
'column1', column1,
'column2', column2
-- 添加更多列
)) FROM (SELECT * FROM tmp_result) t);
-- 更新缓存
INSERT INTO query_cache (cache_key, cache_value, expires_at)
VALUES (
p_cache_key,
@result,
DATE_ADD(NOW(), INTERVAL p_expire_minutes MINUTE)
)
ON DUPLICATE KEY UPDATE
cache_value = VALUES(cache_value),
expires_at = VALUES(expires_at);
DEALLOCATE PREPARE stmt;
ELSE
-- 返回缓存的结果
SELECT JSON_EXTRACT(v_cache_value, '$[*]');
END IF;
END $$
DELIMITER ;
-- 3. 缓存预热
DELIMITER $$
CREATE PROCEDURE warm_up_cache()
BEGIN
-- 预热热门商品缓存
CALL cache_query(
'hot_products',
'SELECT * FROM products WHERE sales_count > 1000 ORDER BY sales_count DESC LIMIT 100',
60 -- 缓存60分钟
);
-- 预热分类统计缓存
CALL cache_query(
'category_stats',
'SELECT category_id, COUNT(*) as product_count, AVG(price) as avg_price FROM products GROUP BY category_id',
120 -- 缓存120分钟
);
END $$
DELIMITER ;
-- 4. 缓存清理
DELIMITER $$
CREATE PROCEDURE clean_expired_cache()
BEGIN
-- 删除过期缓存
DELETE FROM query_cache WHERE expires_at < NOW();
-- 压缩缓存表
OPTIMIZE TABLE query_cache;
END $$
DELIMITER ;
-- 创建定时任务
CREATE EVENT clean_cache_event
ON SCHEDULE EVERY 1 HOUR
DO CALL clean_expired_cache();
-- 5. 缓存监控
CREATE VIEW cache_statistics AS
SELECT
COUNT(*) as total_cache_entries,
COUNT(CASE WHEN expires_at < NOW() THEN 1 END) as expired_entries,
AVG(TIMESTAMPDIFF(MINUTE, created_at, expires_at)) as avg_cache_duration,
MAX(updated_at) as last_cache_update
FROM query_cache;
2. 缓存策略
-- 1. 多级缓存实现
CREATE TABLE cache_levels (
id INT PRIMARY KEY,
level_name VARCHAR(50),
priority INT,
max_size BIGINT,
expire_time INT -- 秒数
);
INSERT INTO cache_levels VALUES
(1, 'memory', 1, 1073741824, 300), -- 1GB, 5分钟
(2, 'redis', 2, 10737418240, 3600), -- 10GB, 1小时
(3, 'disk', 3, 107374182400, 86400); -- 100GB, 1天
-- 2. 缓存队列表
CREATE TABLE cache_queue (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
cache_key VARCHAR(255),
cache_level INT,
priority INT,
status ENUM('pending', 'processing', 'completed', 'failed'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (cache_level) REFERENCES cache_levels(id)
);
-- 3. 缓存淘汰策略实现
DELIMITER $$
CREATE PROCEDURE evict_cache(IN p_level_id INT)
BEGIN
DECLARE v_max_size BIGINT;
DECLARE v_current_size BIGINT;
-- 获取缓存级别的最大大小
SELECT max_size INTO v_max_size
FROM cache_levels
WHERE id = p_level_id;
-- 获取当前缓存大小
SELECT SUM(LENGTH(cache_value)) INTO v_current_size
FROM query_cache
WHERE cache_level = p_level_id;
-- 如果超过最大大小,执行LRU淘汰
IF v_current_size > v_max_size THEN
DELETE FROM query_cache
WHERE cache_level = p_level_id
AND id IN (
SELECT id
FROM (
SELECT id
FROM query_cache
WHERE cache_level = p_level_id
ORDER BY updated_at ASC
LIMIT 100
) tmp
);
END IF;
END $$
DELIMITER ;
-- 4. 缓存预加载策略
DELIMITER $$
CREATE PROCEDURE preload_cache()
BEGIN
-- 预加载热门商品
INSERT INTO cache_queue (cache_key, cache_level, priority, status)
SELECT
CONCAT('product:', id),
1, -- memory缓存级别
10, -- 高优先级
'pending'
FROM products
WHERE views_count > 1000
ORDER BY views_count DESC
LIMIT 100;
-- 预加载分类统计
INSERT INTO cache_queue (cache_key, cache_level, priority, status)
SELECT
CONCAT('category_stats:', id),
2, -- redis缓存级别
5, -- 中等优先级
'pending'
FROM categories;
END $$
DELIMITER ;
-- 5. 缓存一致性维护
DELIMITER $$
CREATE TRIGGER after_product_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
-- 清除相关缓存
DELETE FROM query_cache
WHERE cache_key LIKE CONCAT('product:', NEW.id, '%');
-- 添加缓存重建任务
INSERT INTO cache_queue (cache_key, cache_level, priority, status)
VALUES (
CONCAT('product:', NEW.id),
1, -- memory缓存级别
10, -- 高优先级
'pending'
);
END $$
DELIMITER ;
-- 6. 缓存监控和统计
CREATE VIEW cache_statistics AS
SELECT
cl.level_name,
COUNT(*) as entry_count,
SUM(LENGTH(qc.cache_value)) as total_size,
AVG(TIMESTAMPDIFF(SECOND, qc.created_at, qc.expires_at)) as avg_ttl,
COUNT(CASE WHEN qc.expires_at < NOW() THEN 1 END) as expired_count
FROM query_cache qc
JOIN cache_levels cl ON qc.cache_level = cl.id
GROUP BY cl.level_name;
六、监控和维护
1. 性能监控
-- 1. 慢查询监控
-- 配置慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow-query.log';
SET GLOBAL long_query_time = 2; -- 设置慢查询阈值为2秒
-- 创建慢查询分析视图
CREATE VIEW slow_query_analysis AS
SELECT
db_name,
SUBSTRING_INDEX(SUBSTRING_INDEX(query_text, ' ', 1), '\n', 1) as query_pattern,
COUNT(*) as execution_count,
AVG(query_time) as avg_time,
MAX(query_time) as max_time,
SUM(rows_examined) as total_rows_examined,
SUM(rows_sent) as total_rows_sent
FROM mysql.slow_log
GROUP BY db_name, query_pattern
ORDER BY avg_time DESC;
-- 2. 连接数监控
CREATE TABLE connection_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_connections INT,
active_connections INT,
waiting_connections INT
);
DELIMITER $$
CREATE PROCEDURE monitor_connections()
BEGIN
INSERT INTO connection_log (total_connections, active_connections, waiting_connections)
SELECT
COUNT(*) as total_connections,
COUNT(CASE WHEN command != 'Sleep' THEN 1 END) as active_connections,
COUNT(CASE WHEN state = 'Waiting for table lock' THEN 1 END) as waiting_connections
FROM information_schema.processlist;
END $$
DELIMITER ;
-- 创建定时任务
CREATE EVENT monitor_connections_event
ON SCHEDULE EVERY 1 MINUTE
DO CALL monitor_connections();
-- 3. 资源使用监控
CREATE TABLE resource_usage_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
cpu_usage FLOAT,
memory_usage BIGINT,
disk_usage BIGINT,
iops INT
);
-- 4. 查询性能监控
CREATE TABLE query_performance_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
query_id VARCHAR(32),
query_text TEXT,
execution_time DECIMAL(10,4),
rows_affected INT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
explain_plan JSON
);
DELIMITER $$
CREATE TRIGGER before_query_execution
BEFORE INSERT ON query_performance_log
FOR EACH ROW
BEGIN
-- 获取执行计划
SET @explain_json = (
SELECT JSON_ARRAYAGG(
JSON_OBJECT(
'select_type', select_type,
'table', table_name,
'type', access_type,
'rows', rows_examined_per_scan
)
)
FROM information_schema.processlist
WHERE info = NEW.query_text
);
SET NEW.explain_plan = @explain_json;
END $$
DELIMITER ;
-- 5. 性能报告生成
CREATE VIEW performance_summary AS
WITH query_stats AS (
SELECT
DATE(timestamp) as stat_date,
COUNT(*) as total_queries,
AVG(execution_time) as avg_execution_time,
MAX(execution_time) as max_execution_time,
SUM(CASE WHEN execution_time > 1 THEN 1 ELSE 0 END) as slow_queries
FROM query_performance_log
GROUP BY DATE(timestamp)
),
resource_stats AS (
SELECT
DATE(timestamp) as stat_date,
AVG(cpu_usage) as avg_cpu_usage,
MAX(cpu_usage) as max_cpu_usage,
AVG(memory_usage) as avg_memory_usage,
MAX(memory_usage) as max_memory_usage
FROM resource_usage_log
GROUP BY DATE(timestamp)
)
SELECT
qs.stat_date,
qs.total_queries,
qs.avg_execution_time,
qs.slow_queries,
rs.avg_cpu_usage,
rs.avg_memory_usage
FROM query_stats qs
JOIN resource_stats rs ON qs.stat_date = rs.stat_date;
-- 6. 性能预警系统
CREATE TABLE performance_alerts (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
alert_type ENUM('slow_query', 'high_cpu', 'high_memory', 'high_connections'),
alert_message TEXT,
severity ENUM('info', 'warning', 'critical'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
DELIMITER $$
CREATE PROCEDURE check_performance_alerts()
BEGIN
-- 检查慢查询
INSERT INTO performance_alerts (alert_type, alert_message, severity)
SELECT
'slow_query',
CONCAT('Query took ', execution_time, ' seconds: ', LEFT(query_text, 100)),
CASE
WHEN execution_time > 10 THEN 'critical'
WHEN execution_time > 5 THEN 'warning'
ELSE 'info'
END
FROM query_performance_log
WHERE execution_time > 2
AND timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE);
-- 检查资源使用
INSERT INTO performance_alerts (alert_type, alert_message, severity)
SELECT
'high_cpu',
CONCAT('CPU usage at ', ROUND(cpu_usage, 2), '%'),
CASE
WHEN cpu_usage > 90 THEN 'critical'
WHEN cpu_usage > 70 THEN 'warning'
ELSE 'info'
END
FROM resource_usage_log
WHERE timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
AND cpu_usage > 70;
END $$
DELIMITER ;
-- 创建定时检查任务
CREATE EVENT check_performance_alerts_event
ON SCHEDULE EVERY 5 MINUTE
DO CALL check_performance_alerts();
2. 定期维护
-- 1. 表维护程序
DELIMITER $$
CREATE PROCEDURE maintain_tables()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE table_name VARCHAR(255);
DECLARE cur_tables CURSOR FOR
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE();
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
-- 创建维护日志
CREATE TABLE IF NOT EXISTS maintenance_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255),
operation VARCHAR(50),
start_time TIMESTAMP,
end_time TIMESTAMP,
status VARCHAR(20),
error_message TEXT
);
OPEN cur_tables;
read_loop: LOOP
FETCH cur_tables INTO table_name;
IF done THEN
LEAVE read_loop;
END IF;
-- 记录开始时间
INSERT INTO maintenance_log (table_name, operation, start_time, status)
VALUES (table_name, 'OPTIMIZE', NOW(), 'RUNNING');
-- 执行维护操作
SET @sql = CONCAT('OPTIMIZE TABLE ', table_name);
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION
BEGIN
UPDATE maintenance_log
SET status = 'ERROR',
error_message = 'Error during optimization',
end_time = NOW()
WHERE table_name = table_name
AND status = 'RUNNING';
END;
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 更新完成状态
UPDATE maintenance_log
SET status = 'COMPLETED',
end_time = NOW()
WHERE table_name = table_name
AND status = 'RUNNING';
END;
END LOOP;
CLOSE cur_tables;
END $$
DELIMITER ;
-- 2. 统计信息更新
CREATE PROCEDURE update_statistics()
BEGIN
-- 更新表统计信息
ANALYZE TABLE users, orders, products;
-- 更新索引统计信息
ANALYZE TABLE users PERSISTENT FOR COLUMNS (email), INDEXES (idx_email);
END;
-- 3. 数据清理
DELIMITER $$
CREATE PROCEDURE cleanup_old_data()
BEGIN
-- 设置清理时间阈值
SET @cleanup_date = DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR);
-- 开始事务
START TRANSACTION;
-- 清理旧日志
DELETE FROM system_logs
WHERE created_at < @cleanup_date;
-- 清理旧会话
DELETE FROM user_sessions
WHERE last_activity < @cleanup_date;
-- 归档旧订单
INSERT INTO orders_archive
SELECT * FROM orders
WHERE order_date < @cleanup_date;
DELETE FROM orders
WHERE order_date < @cleanup_date;
-- 提交事务
COMMIT;
END $$
DELIMITER ;
-- 4. 备份管理
DELIMITER $$
CREATE PROCEDURE manage_backups()
BEGIN
-- 记录备份开始
INSERT INTO backup_log (backup_type, start_time, status)
VALUES ('FULL', NOW(), 'RUNNING');
SET @backup_id = LAST_INSERT_ID();
-- 执行备份
SET @backup_file = CONCAT('/backups/full_', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'), '.sql');
SET @backup_command = CONCAT(
'mysqldump -u root -p --all-databases --events --routines ',
'> ', @backup_file
);
-- 执行系统命令(需要适当的权限)
SET @result = sys_exec(@backup_command);
-- 更新备份状态
UPDATE backup_log
SET end_time = NOW(),
status = IF(@result = 0, 'COMPLETED', 'FAILED'),
file_path = @backup_file
WHERE id = @backup_id;
-- 清理旧备份
SET @cleanup_date = DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY);
DELETE FROM backup_log
WHERE start_time < @cleanup_date;
END $$
DELIMITER ;
-- 5. 创建维护计划
CREATE EVENT daily_maintenance
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_DATE + INTERVAL 1 DAY + INTERVAL 1 HOUR
DO
BEGIN
CALL maintain_tables();
CALL update_statistics();
CALL cleanup_old_data();
END;
CREATE EVENT weekly_backup
ON SCHEDULE EVERY 1 WEEK
STARTS CURRENT_DATE + INTERVAL 1 WEEK
DO
CALL manage_backups();
-- 6. 维护监控
CREATE VIEW maintenance_status AS
SELECT
DATE(start_time) as maintenance_date,
operation,
COUNT(*) as total_operations,
COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) as successful_operations,
COUNT(CASE WHEN status = 'ERROR' THEN 1 END) as failed_operations,
AVG(TIMESTAMPDIFF(SECOND, start_time, end_time)) as avg_duration_seconds
FROM maintenance_log
GROUP BY DATE(start_time), operation;
3. 故障恢复
-- 1. 故障检测和记录
CREATE TABLE system_failures (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
failure_type VARCHAR(50),
failure_description TEXT,
detection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolution_time TIMESTAMP,
resolution_description TEXT,
affected_tables TEXT,
severity ENUM('low', 'medium', 'high', 'critical'),
status ENUM('detected', 'investigating', 'resolving', 'resolved')
);
-- 2. 自动故障检测程序
DELIMITER $$
CREATE PROCEDURE detect_database_issues()
BEGIN
-- 检查表损坏
INSERT INTO system_failures (
failure_type,
failure_description,
affected_tables,
severity,
status
)
SELECT
'TABLE_CORRUPTION',
CONCAT('Table ', table_name, ' may be corrupted'),
table_name,
'high',
'detected'
FROM information_schema.TABLES
WHERE CHECK_TABLE = 'CHECK_FAILED';
-- 检查连接问题
IF (
SELECT COUNT(*)
FROM information_schema.PROCESSLIST
WHERE command = 'Sleep'
) > 1000 THEN
INSERT INTO system_failures (
failure_type,
failure_description,
severity,
status
)
VALUES (
'CONNECTION_OVERLOAD',
'Too many sleeping connections',
'medium',
'detected'
);
END IF;
-- 检查磁盘空间
IF (
SELECT DATA_FREE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
) < 1073741824 THEN -- 1GB
INSERT INTO system_failures (
failure_type,
failure_description,
severity,
status
)
VALUES (
'LOW_DISK_SPACE',
'Database is running low on disk space',
'critical',
'detected'
);
END IF;
END $$
DELIMITER ;
-- 3. 自动恢复程序
DELIMITER $$
CREATE PROCEDURE attempt_recovery(IN p_failure_id BIGINT)
BEGIN
DECLARE v_failure_type VARCHAR(50);
DECLARE v_affected_tables TEXT;
-- 获取故障信息
SELECT failure_type, affected_tables
INTO v_failure_type, v_affected_tables
FROM system_failures
WHERE id = p_failure_id;
-- 更新状态为处理中
UPDATE system_failures
SET status = 'investigating'
WHERE id = p_failure_id;
-- 根据故障类型执行恢复操作
CASE v_failure_type
WHEN 'TABLE_CORRUPTION' THEN
BEGIN
-- 尝试修复表
SET @repair_sql = CONCAT('REPAIR TABLE ', v_affected_tables);
PREPARE stmt FROM @repair_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 验证修复结果
SET @check_sql = CONCAT('CHECK TABLE ', v_affected_tables);
PREPARE stmt FROM @check_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END;
WHEN 'CONNECTION_OVERLOAD' THEN
BEGIN
-- 清理空闲连接
KILL CONNECTION (
SELECT id
FROM information_schema.PROCESSLIST
WHERE command = 'Sleep'
AND time > 3600
);
END;
WHEN 'LOW_DISK_SPACE' THEN
BEGIN
-- 执行紧急清理
CALL cleanup_old_data();
-- 优化表空间
OPTIMIZE TABLE users, orders, products;
END;
END CASE;
-- 更新恢复状态
UPDATE system_failures
SET status = 'resolved',
resolution_time = NOW(),
resolution_description = CONCAT('Automatic recovery for ', v_failure_type, ' completed')
WHERE id = p_failure_id;
END $$
DELIMITER ;
-- 4. 数据恢复程序
DELIMITER $$
CREATE PROCEDURE recover_data(
IN p_table_name VARCHAR(255),
IN p_backup_date DATETIME,
IN p_where_clause TEXT
)
BEGIN
-- 记录恢复操作
INSERT INTO recovery_log (
table_name,
backup_date,
where_clause,
start_time,
status
)
VALUES (
p_table_name,
p_backup_date,
p_where_clause,
NOW(),
'STARTED'
);
SET @recovery_id = LAST_INSERT_ID();
-- 创建临时表
SET @create_tmp = CONCAT(
'CREATE TABLE tmp_recovery_', @recovery_id,
' LIKE ', p_table_name
);
PREPARE stmt FROM @create_tmp;
EXECUTE stmt;
-- 从备份恢复数据
SET @restore_data = CONCAT(
'INSERT INTO tmp_recovery_', @recovery_id,
' SELECT * FROM ', p_table_name, '_backup',
' WHERE backup_date = ', QUOTE(p_backup_date),
' AND ', p_where_clause
);
PREPARE stmt FROM @restore_data;
EXECUTE stmt;
-- 合并恢复的数据
SET @merge_data = CONCAT(
'REPLACE INTO ', p_table_name,
' SELECT * FROM tmp_recovery_', @recovery_id
);
PREPARE stmt FROM @merge_data;
EXECUTE stmt;
-- 清理临时表
SET @drop_tmp = CONCAT('DROP TABLE tmp_recovery_', @recovery_id);
PREPARE stmt FROM @drop_tmp;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 更新恢复日志
UPDATE recovery_log
SET end_time = NOW(),
status = 'COMPLETED',
rows_recovered = ROW_COUNT()
WHERE id = @recovery_id;
END $$
DELIMITER ;
-- 5. 创建恢复点
CREATE PROCEDURE create_recovery_point(
IN p_description VARCHAR(255)
)
BEGIN
-- 记录恢复点
INSERT INTO recovery_points (
description,
created_at,
status
)
VALUES (
p_description,
NOW(),
'CREATING'
);
SET @recovery_point_id = LAST_INSERT_ID();
-- 创建表快照
CREATE TABLE recovery_point_data (
recovery_point_id BIGINT,
table_name VARCHAR(255),
data_snapshot LONGBLOB,
created_at TIMESTAMP
);
-- 保存主要表的状态
INSERT INTO recovery_point_data
SELECT
@recovery_point_id,
table_name,
snapshot,
NOW()
FROM (
SELECT
'users' as table_name,
user_data as snapshot
FROM users
UNION ALL
SELECT
'orders' as table_name,
order_data as snapshot
FROM orders
) as snapshots;
-- 更新恢复点状态
UPDATE recovery_points
SET status = 'CREATED'
WHERE id = @recovery_point_id;
END;
七、安全优化
1. 权限管理
-- 1. 创建角色和权限系统
CREATE TABLE roles (
role_id INT PRIMARY KEY AUTO_INCREMENT,
role_name VARCHAR(50) UNIQUE,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE permissions (
permission_id INT PRIMARY KEY AUTO_INCREMENT,
permission_name VARCHAR(100) UNIQUE,
description TEXT,
resource_type VARCHAR(50),
access_level ENUM('READ', 'WRITE', 'ADMIN')
);
CREATE TABLE role_permissions (
role_id INT,
permission_id INT,
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
granted_by VARCHAR(100),
PRIMARY KEY (role_id, permission_id),
FOREIGN KEY (role_id) REFERENCES roles(role_id),
FOREIGN KEY (permission_id) REFERENCES permissions(permission_id)
);
CREATE TABLE user_roles (
user_id INT,
role_id INT,
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NULL,
granted_by VARCHAR(100),
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (role_id) REFERENCES roles(role_id)
);
-- 2. 创建权限管理存储过程
DELIMITER $$
CREATE PROCEDURE grant_user_role(
IN p_user_id INT,
IN p_role_name VARCHAR(50),
IN p_granted_by VARCHAR(100)
)
BEGIN
DECLARE v_role_id INT;
-- 获取角色ID
SELECT role_id INTO v_role_id
FROM roles
WHERE role_name = p_role_name;
-- 授予角色
INSERT INTO user_roles (user_id, role_id, granted_by)
VALUES (p_user_id, v_role_id, p_granted_by)
ON DUPLICATE KEY UPDATE
granted_at = CURRENT_TIMESTAMP,
granted_by = p_granted_by;
-- 记录审计日志
INSERT INTO security_audit_log (
action_type,
user_id,
role_id,
performed_by,
action_description
)
VALUES (
'GRANT_ROLE',
p_user_id,
v_role_id,
p_granted_by,
CONCAT('Granted role ', p_role_name, ' to user ', p_user_id)
);
END $$
CREATE PROCEDURE revoke_user_role(
IN p_user_id INT,
IN p_role_name VARCHAR(50),
IN p_revoked_by VARCHAR(100)
)
BEGIN
DECLARE v_role_id INT;
-- 获取角色ID
SELECT role_id INTO v_role_id
FROM roles
WHERE role_name = p_role_name;
-- 撤销角色
DELETE FROM user_roles
WHERE user_id = p_user_id
AND role_id = v_role_id;
-- 记录审计日志
INSERT INTO security_audit_log (
action_type,
user_id,
role_id,
performed_by,
action_description
)
VALUES (
'REVOKE_ROLE',
p_user_id,
v_role_id,
p_revoked_by,
CONCAT('Revoked role ', p_role_name, ' from user ', p_user_id)
);
END $$
DELIMITER ;
-- 3. 创建安全视图
CREATE VIEW user_permissions AS
SELECT
u.user_id,
r.role_name,
p.permission_name,
p.resource_type,
p.access_level
FROM user_roles ur
JOIN roles r ON ur.role_id = r.role_id
JOIN role_permissions rp ON r.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.permission_id
JOIN users u ON ur.user_id = u.id
WHERE (ur.expires_at IS NULL OR ur.expires_at > NOW());
-- 4. 创建权限检查函数
DELIMITER $$
CREATE FUNCTION check_user_permission(
p_user_id INT,
p_permission_name VARCHAR(100)
) RETURNS BOOLEAN
DETERMINISTIC
BEGIN
DECLARE has_permission BOOLEAN;
SELECT EXISTS (
SELECT 1
FROM user_permissions
WHERE user_id = p_user_id
AND permission_name = p_permission_name
) INTO has_permission;
RETURN has_permission;
END $$
DELIMITER ;
-- 5. 设置默认安全策略
-- 创建基本角色
INSERT INTO roles (role_name, description) VALUES
('admin', 'Full system access'),
('developer', 'Development and testing access'),
('analyst', 'Read-only access to analytics'),
('user', 'Basic user access');
-- 创建基本权限
INSERT INTO permissions (permission_name, resource_type, access_level) VALUES
('READ_USER_DATA', 'USER', 'READ'),
('WRITE_USER_DATA', 'USER', 'WRITE'),
('MANAGE_USERS', 'USER', 'ADMIN'),
('READ_ANALYTICS', 'ANALYTICS', 'READ'),
('GENERATE_REPORTS', 'REPORTS', 'WRITE');
-- 分配角色权限
INSERT INTO role_permissions (role_id, permission_id)
SELECT
r.role_id,
p.permission_id
FROM roles r
CROSS JOIN permissions p
WHERE r.role_name = 'admin';
-- 6. 审计日志
CREATE TABLE security_audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
action_type VARCHAR(50),
user_id INT,
role_id INT,
permission_id INT,
performed_by VARCHAR(100),
action_description TEXT,
action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address VARCHAR(45),
success BOOLEAN
);
2. 加密和脱敏
-- 1. 创建加密函数
DELIMITER $$
CREATE FUNCTION encrypt_data(p_data TEXT, p_key VARCHAR(32))
RETURNS VARBINARY(1000)
DETERMINISTIC
BEGIN
RETURN AES_ENCRYPT(p_data, p_key);
END $$
CREATE FUNCTION decrypt_data(p_data VARBINARY(1000), p_key VARCHAR(32))
RETURNS TEXT
DETERMINISTIC
BEGIN
RETURN AES_DECRYPT(p_data, p_key);
END $$
-- 2. 创建脱敏函数
CREATE FUNCTION mask_email(p_email VARCHAR(255))
RETURNS VARCHAR(255)
DETERMINISTIC
BEGIN
DECLARE v_local_part VARCHAR(255);
DECLARE v_domain VARCHAR(255);
SET v_local_part = SUBSTRING_INDEX(p_email, '@', 1);
SET v_domain = SUBSTRING_INDEX(p_email, '@', -1);
RETURN CONCAT(
LEFT(v_local_part, 2),
REPEAT('*', LENGTH(v_local_part) - 2),
'@',
v_domain
);
END $$
CREATE FUNCTION mask_phone(p_phone VARCHAR(20))
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
RETURN CONCAT(
LEFT(p_phone, 3),
REPEAT('*', LENGTH(p_phone) - 7),
RIGHT(p_phone, 4)
);
END $$
CREATE FUNCTION mask_card(p_card VARCHAR(20))
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
RETURN CONCAT(
'XXXX-XXXX-XXXX-',
RIGHT(p_card, 4)
);
END $$
DELIMITER ;
-- 3. 创建敏感数据表
CREATE TABLE sensitive_data (
id INT PRIMARY KEY AUTO_INCREMENT,
data_type VARCHAR(50),
raw_data VARBINARY(1000),
encryption_key_id INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_data_type (data_type)
);
-- 4. 创建加密密钥管理
CREATE TABLE encryption_keys (
id INT PRIMARY KEY AUTO_INCREMENT,
key_name VARCHAR(100),
key_value VARBINARY(1000),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
status ENUM('active', 'expired', 'revoked'),
INDEX idx_status (status)
);
-- 5. 创建数据访问控制
DELIMITER $$
CREATE PROCEDURE access_sensitive_data(
IN p_user_id INT,
IN p_data_type VARCHAR(50),
IN p_data_id INT
)
BEGIN
DECLARE v_has_permission BOOLEAN;
DECLARE v_encrypted_data VARBINARY(1000);
DECLARE v_key_value VARCHAR(32);
-- 检查权限
SELECT EXISTS (
SELECT 1 FROM user_permissions
WHERE user_id = p_user_id
AND permission_name = CONCAT('READ_', p_data_type)
) INTO v_has_permission;
IF v_has_permission THEN
-- 获取加密数据和密钥
SELECT
sd.raw_data,
ek.key_value
INTO
v_encrypted_data,
v_key_value
FROM sensitive_data sd
JOIN encryption_keys ek ON sd.encryption_key_id = ek.id
WHERE sd.id = p_data_id
AND sd.data_type = p_data_type
AND ek.status = 'active';
-- 解密并返回数据
SELECT decrypt_data(v_encrypted_data, v_key_value) as decrypted_data;
-- 记录访问日志
INSERT INTO data_access_log (
user_id,
data_type,
data_id,
access_time,
access_type
)
VALUES (
p_user_id,
p_data_type,
p_data_id,
NOW(),
'READ'
);
ELSE
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Permission denied';
END IF;
END $$
DELIMITER ;
-- 6. 创建数据脱敏视图
CREATE VIEW masked_user_data AS
SELECT
id,
username,
mask_email(email) as email,
mask_phone(phone) as phone,
city,
country,
created_at
FROM users;
-- 7. 创建加密数据备份程序
DELIMITER $$
CREATE PROCEDURE backup_sensitive_data()
BEGIN
DECLARE v_backup_key VARCHAR(32);
-- 生成备份密钥
SET v_backup_key = SHA2(CONCAT(UUID(), RAND()), 256);
-- 创建加密备份
CREATE TABLE sensitive_data_backup AS
SELECT
id,
data_type,
encrypt_data(raw_data, v_backup_key) as encrypted_backup,
NOW() as backup_time
FROM sensitive_data;
-- 安全存储备份密钥
INSERT INTO encryption_keys (
key_name,
key_value,
expires_at,
status
)
VALUES (
CONCAT('BACKUP_KEY_', DATE_FORMAT(NOW(), '%Y%m%d')),
v_backup_key,
DATE_ADD(NOW(), INTERVAL 30 DAY),
'active'
);
END $$
DELIMITER ;
-- 8. 创建密钥轮换程序
DELIMITER $$
CREATE PROCEDURE rotate_encryption_keys()
BEGIN
DECLARE v_new_key VARCHAR(32);
DECLARE v_old_key_id INT;
START TRANSACTION;
-- 生成新密钥
SET v_new_key = SHA2(CONCAT(UUID(), RAND()), 256);
-- 保存新密钥
INSERT INTO encryption_keys (
key_name,
key_value,
status
)
VALUES (
CONCAT('KEY_', DATE_FORMAT(NOW(), '%Y%m%d')),
v_new_key,
'active'
);
SET v_old_key_id = (
SELECT id
FROM encryption_keys
WHERE status = 'active'
ORDER BY created_at DESC
LIMIT 1
);
-- 重新加密数据
UPDATE sensitive_data
SET raw_data = encrypt_data(
decrypt_data(raw_data, (
SELECT key_value
FROM encryption_keys
WHERE id = encryption_key_id
)),
v_new_key
),
encryption_key_id = LAST_INSERT_ID()
WHERE encryption_key_id = v_old_key_id;
-- 废除旧密钥
UPDATE encryption_keys
SET status = 'expired'
WHERE id = v_old_key_id;
COMMIT;
END $$
DELIMITER ;
3. 审计跟踪
-- 1. 创建审计日志表
CREATE TABLE audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
action_type ENUM('INSERT', 'UPDATE', 'DELETE', 'SELECT', 'LOGIN', 'LOGOUT'),
table_name VARCHAR(100),
record_id BIGINT,
old_value JSON,
new_value JSON,
user_id INT,
application_user VARCHAR(100),
action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
client_ip VARCHAR(45),
user_agent VARCHAR(255)
);
-- 2. 创建审计触发器
DELIMITER $$
-- 为用户表创建审计触发器
CREATE TRIGGER users_audit_insert
AFTER INSERT ON users
FOR EACH ROW
BEGIN
INSERT INTO audit_log (
action_type,
table_name,
record_id,
new_value,
user_id,
application_user
)
VALUES (
'INSERT',
'users',
NEW.id,
JSON_OBJECT(
'username', NEW.username,
'email', NEW.email,
'status', NEW.status
),
@current_user_id,
CURRENT_USER()
);
END $$
CREATE TRIGGER users_audit_update
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
INSERT INTO audit_log (
action_type,
table_name,
record_id,
old_value,
new_value,
user_id,
application_user
)
VALUES (
'UPDATE',
'users',
NEW.id,
JSON_OBJECT(
'username', OLD.username,
'email', OLD.email,
'status', OLD.status
),
JSON_OBJECT(
'username', NEW.username,
'email', NEW.email,
'status', NEW.status
),
@current_user_id,
CURRENT_USER()
);
END $$
CREATE TRIGGER users_audit_delete
AFTER DELETE ON users
FOR EACH ROW
BEGIN
INSERT INTO audit_log (
action_type,
table_name,
record_id,
old_value,
user_id,
application_user
)
VALUES (
'DELETE',
'users',
OLD.id,
JSON_OBJECT(
'username', OLD.username,
'email', OLD.email,
'status', OLD.status
),
@current_user_id,
CURRENT_USER()
);
END $$
DELIMITER ;
-- 3. 创建审计查询视图
CREATE VIEW audit_summary AS
SELECT
DATE(action_time) as audit_date,
action_type,
table_name,
COUNT(*) as action_count,
COUNT(DISTINCT user_id) as unique_users
FROM audit_log
GROUP BY DATE(action_time), action_type, table_name;
-- 4. 创建审计报告程序
DELIMITER $$
CREATE PROCEDURE generate_audit_report(
IN p_start_date DATE,
IN p_end_date DATE
)
BEGIN
-- 总体统计
SELECT
action_type,
COUNT(*) as total_actions,
COUNT(DISTINCT user_id) as unique_users,
MIN(action_time) as first_action,
MAX(action_time) as last_action
FROM audit_log
WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
GROUP BY action_type;
-- 用户活动分析
SELECT
user_id,
application_user,
COUNT(*) as total_actions,
GROUP_CONCAT(DISTINCT action_type) as action_types,
COUNT(DISTINCT table_name) as affected_tables
FROM audit_log
WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
GROUP BY user_id, application_user
ORDER BY total_actions DESC;
-- 可疑活动检测
SELECT
action_time,
action_type,
table_name,
user_id,
application_user,
client_ip
FROM audit_log
WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
AND (
-- 大量删除操作
action_type = 'DELETE'
-- 非工作时间的活动
OR HOUR(action_time) NOT BETWEEN 9 AND 18
-- 异常IP访问
OR client_ip NOT LIKE '192.168.%'
);
END $$
-- 5. 创建审计清理程序
CREATE PROCEDURE clean_audit_logs(IN p_days_to_keep INT)
BEGIN
-- 将旧日志归档
INSERT INTO audit_log_archive
SELECT *
FROM audit_log
WHERE action_time < DATE_SUB(CURRENT_DATE, INTERVAL p_days_to_keep DAY);
-- 删除旧日志
DELETE FROM audit_log
WHERE action_time < DATE_SUB(CURRENT_DATE, INTERVAL p_days_to_keep DAY);
END $$
DELIMITER ;
-- 6. 创建审计警报程序
DELIMITER $$
CREATE PROCEDURE check_audit_alerts()
BEGIN
-- 检查可疑活动
INSERT INTO security_alerts (
alert_type,
description,
severity,
created_at
)
SELECT
'SUSPICIOUS_ACTIVITY',
CONCAT(
'Multiple delete operations (',
COUNT(*),
') by user ',
user_id,
' in the last hour'
),
'HIGH',
NOW()
FROM audit_log
WHERE action_type = 'DELETE'
AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY user_id
HAVING COUNT(*) > 100;
-- 检查异常登录
INSERT INTO security_alerts (
alert_type,
description,
severity,
created_at
)
SELECT
'ABNORMAL_LOGIN',
CONCAT(
'Multiple failed login attempts (',
COUNT(*),
') from IP ',
client_ip
),
'HIGH',
NOW()
FROM audit_log
WHERE action_type = 'LOGIN'
AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
AND new_value->>'$.success' = 'false'
GROUP BY client_ip
HAVING COUNT(*) > 5;
END $$
DELIMITER ;