前言
接到一项取证相关任务,需要对一份数据库文件进行分析。实际拿到文件后发现,该文件并非传统的物理数据文件,而是阿里云 RDS MySQL 的快照备份数据。 因此,第一步工作并不是直接分析数据,而是将快照内容完整还原为可用的 MySQL 数据库实例。
查阅阿里云官方文档RDS MySQL 快照备份文件恢复到自建数据库后,结合实际数据规模与性能瓶颈,逐步形成了本文所记录的恢复与优化方案。
快照目录结构分析
对快照文件解压后,其目录结构大致如下:
├── db1/
│ ├── table_a/
│ │ ├── structure.sql
│ │ └── data/
│ │ ├── data-001.sql
│ │ └── data-002.sql
│ └── table_b/
│ ├── structure.sql
│ └── data/
│ └── data-001.sql
├── db2/
│ ├── users/
│ │ ├── structure.sql
│ │ └── data/
│ │ └── data_0001.sql
│ └── orders/
│ ├── structure.sql
│ └── data/
│ ├── data_0001.sql
│ └── data_0002.sql
└── db3/
└── empty_table/
└── structure.sql # 仅包含结构,会创建空表
可以确认以下几点:
- 每个数据库对应一个一级目录
- 每个表对应一个子目录
structure.sql:表或库的结构定义data/目录:表数据(.sql 或 .csv)- 若仅存在
structure.sql,则恢复后为空表
初始恢复尝试
按照阿里云官方提供的 Python 示例脚本进行恢复测试。 然而在实际环境中遇到以下问题:
- 数据规模极大:.sql 文件总量超过 500GB
- 导入速度极慢:长时间无明显进度
- 资源利用率低:CPU、IO 未被充分使用
官方脚本的核心问题在于:
- 单线程顺序导入
- 完全依赖 MySQL 默认配置
- 每个 SQL 文件独立执行,事务与刷盘开销巨大
官方示例脚本如下(原样保留):
官方 Python 脚本源码
#!/usr/bin/python
# ****************************************************************#
# ScriptName: restore_from_downloads
# Author: @alibaba-inc.com
# Function:
# ***************************************************************#
import os
import sys
STRUCTURE_FILE_NAME = "structure.sql"
DATA_PATH_PREFIX = "data"
def create_database(db_host, db_port, db_user, db_pass, create_stmt_file):
cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + \
db_user + " -p" + db_pass + " <" + create_stmt_file
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
def create_table(db_host, db_port, db_user, db_pass, db_name, create_stmt_file):
cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + \
" -p" + db_pass + " -D" + db_name + " <" + create_stmt_file
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
def import_file_csv_with_header(db_host, db_port, db_user, db_pass, csv_file, db, table, with_header=False):
if with_header:
in_file = open(csv_file)
schema = in_file.readline().strip('\n')
load_cmd = "load data local infile \"" + csv_file + "\" into table `" + db + "`.`" + table + "` character set utf8mb4 FIELDS TERMINATED BY \",\" enclosed by \"\\\"\" lines terminated by \"\\n\" ignore 1 lines" + \
" (" + schema + ")"
in_file.close()
else:
load_cmd = "load data local infile \"" + csv_file + "\" into table `" + db + "`.`" + table + "` character set utf8mb4 FIELDS TERMINATED BY \",\" enclosed by \"\\\"\""
cmd = "mysql --local_infile=1 -h" + db_host + " -P" + db_port + " -u" + \
db_user + " -p" + db_pass + " -e '" + load_cmd + "'"
print("[INFO]: trying to exec: " + cmd)
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
def import_file_sql(db_host, db_port, db_user, db_pass, sql_file):
cmd = "mysql -h" + db_host + " -P" + db_port + \
" -u" + db_user + " -p" + db_pass + " <" + sql_file
print("[INFO]: trying to exec: " + cmd)
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
def print_usage():
print(
"Usage: python ./restore_mysql.py [backupset_directory] [database_host] [database_port] [database_username] [database_password]")
enable_foreign_key_check = None
def read_db_foreign_key_enable():
global enable_foreign_key_check
cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SELECT @@FOREIGN_KEY_CHECKS'" # + " | awk '{print $1}' | sed -n '2,1p' "
from subprocess import check_output
try:
cmd_output = check_output(cmd, shell=True)
cmd_output = cmd_output.decode()
foreign_check_enable = cmd_output.split("\n")[1]
if foreign_check_enable == "1":
print("[INFO]: foreign key check is on")
enable_foreign_key_check = True
elif foreign_check_enable == "0":
print("[INFO]: foreign key check is off")
enable_foreign_key_check = False
except Exception:
print("[WARN] try to get foreign key config failed. won't change foreign key config for MySQL")
# do nothing
def do_disable_foreign_key_check():
global enable_foreign_key_check
if enable_foreign_key_check is True:
print("[INFO]: try to disable foreign key check before importing data...")
cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SET GLOBAL FOREIGN_KEY_CHECKS=0'"
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
print("[INFO]: success disable foreign key check")
else:
print("[INFO]: no need to disable foreign key check before importing data")
def do_enable_foreign_key_check():
global enable_foreign_key_check
if enable_foreign_key_check is True:
print("[INFO]: try to enable foreign key check after importing data...")
cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SET GLOBAL FOREIGN_KEY_CHECKS=1'"
if os.system(cmd) != 0:
print("[ERROR]: execute SQL failed. command: " + cmd)
exit(1)
print("[INFO]: success enable foreign key check")
else:
print("[INFO]: no need to enable foreign key check after importing data")
if __name__ == '__main__':
if len(sys.argv) != 6:
print_usage()
exit()
root_dir = os.path.abspath(sys.argv[1])
db_host = sys.argv[2]
db_port = sys.argv[3]
db_user = sys.argv[4]
db_pass = sys.argv[5]
print("[INFO]: restore data from " + root_dir +
" to " + db_host + ":" + db_port)
read_db_foreign_key_enable()
do_disable_foreign_key_check()
try:
db_dirs = os.listdir(root_dir)
for db_dir in db_dirs:
dir_path = os.path.join(root_dir, db_dir)
if not os.path.isdir(dir_path):
continue
db_structure_file = os.path.join(dir_path, STRUCTURE_FILE_NAME)
create_database(db_host, db_port, db_user, db_pass, db_structure_file)
print("[INFO]: restore structure database: " + db_dir + " ends")
table_dirs = os.listdir(dir_path)
for table_dir in table_dirs:
table_dir_path = os.path.join(dir_path, table_dir)
if not os.path.isdir(table_dir_path):
continue
table_structure_file = os.path.join(table_dir_path, STRUCTURE_FILE_NAME)
create_table(db_host, db_port, db_user, db_pass, db_dir, table_structure_file)
print("[INFO]: restore structure table: " + table_dir + " ends")
table_data_dir_path = os.path.join(table_dir_path, DATA_PATH_PREFIX)
if not os.path.isdir(table_data_dir_path):
continue
filename_slices = os.listdir(table_data_dir_path)[0].split(".")
files_format = filename_slices[-1]
if files_format == "csv":
with_header = len(filename_slices) > 1 and filename_slices[-2]=="wh" # .wh.csv is csv with header, .csv is csv without header
csv_files = os.listdir(table_data_dir_path)
csv_count = 0
for csv_file in csv_files:
csv_file_path = os.path.join(table_data_dir_path, csv_file)
file_size = os.path.getsize(csv_file_path)
if file_size > 0:
import_file_csv_with_header(db_host, db_port, db_user,
db_pass, csv_file_path, db_dir, table_dir, with_header)
csv_count = csv_count + 1
print("[INFO]: restore data [" + str(csv_count) + "/" + str(
len(csv_files)) + "] of table " + db_dir + "." + table_dir)
elif files_format == "sql":
sql_files = os.listdir(table_data_dir_path)
sql_count = 0
for sql_file in sql_files:
sql_file_path = os.path.join(table_data_dir_path, sql_file)
file_size = os.path.getsize(sql_file_path)
if file_size > 0:
import_file_sql(db_host, db_port,
db_user, db_pass, sql_file_path)
sql_count = sql_count + 1
print("[INFO]: restore data [" + str(sql_count) + "/" + str(
len(sql_files)) + "] of table " + db_dir + "." + table_dir)
finally:
do_enable_foreign_key_check()
性能瓶颈分析
综合分析后,主要瓶颈集中在两个方面:
- MySQL 服务端参数未针对大规模导入进行优化
- 数据导入逻辑为单线程,无法利用多核 CPU 与磁盘并发能力
因此,优化方向也明确为两点:
- MySQL 配置专项调优
- 重写导入脚本,实现并行、高吞吐导入
MySQL 配置专项优化
针对一次性大规模数据导入场景,对 my.cnf 进行针对性调整,核心原则包括:
- 最大化内存与 I/O 利用率
- 暂时关闭影响写入性能的安全与一致性特性
- 导入完成后可恢复为常规生产配置
主要优化点包括但不限于:
- 关闭 binlog、双写缓冲
- 降低事务刷盘频率
- 提高
innodb_buffer_pool_size - 提高
max_allowed_packet - 禁用性能与监控相关模块
针对大文件导入优化的 my.cnf 建议配置
## Mysql 5.7 配置,其他版本请依据实际情况自己修改
[client]
port = 3306
socket = /tmp/mysql.sock
default-character-set = utf8mb4
[mysql]
no-auto-rehash
default-character-set = utf8mb4
[mysqld]
# ============================================
# 基础设置
# ============================================
user = mysql
port = 3306
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
socket = /tmp/mysql.sock
pid-file = /usr/local/mysql/data/mysql.pid
tmpdir = /tmp
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
default-storage-engine = InnoDB
# ============================================
# 网络优化(大文件导入)
# ============================================
max_connections = 200
max_allowed_packet = 1G
connect_timeout = 600
wait_timeout = 28800
interactive_timeout = 28800
net_read_timeout = 600
net_write_timeout = 600
net_buffer_length = 16K
back_log = 300
# ============================================
# 安全设置
# ============================================
skip-name-resolve
local_infile = ON
explicit_defaults_for_timestamp = ON
lower_case_table_names = 0
sql_mode = NO_ENGINE_SUBSTITUTION
# ============================================
# 日志配置(导入时最小化)
# ============================================
log-error = /usr/local/mysql/data/mysql-error.log
log_warnings = 2
slow_query_log = OFF
long_query_time = 2
slow_query_log_file = /usr/local/mysql/data/mysql-slow.log
general_log = OFF
# 二进制日志(导入时禁用)
skip-log-bin
# log-bin = /usr/local/mysql/data/mysql-bin
# expire_logs_days = 7
# max_binlog_size = 1G
# binlog_format = ROW
# sync_binlog = 0
# ============================================
# 缓存优化(8核CPU)
# ============================================
thread_cache_size = 100
table_open_cache = 4096
table_definition_cache = 2048
table_open_cache_instances = 8
# 查询缓存(禁用)
query_cache_type = 0
query_cache_size = 0
# 排序和连接缓冲
sort_buffer_size = 16M
join_buffer_size = 16M
read_buffer_size = 8M
read_rnd_buffer_size = 16M
# MyISAM 键缓存
key_buffer_size = 128M
# 临时表(16GB内存配置)
tmp_table_size = 1G
max_heap_table_size = 1G
# ============================================
# InnoDB 核心配置(16GB内存 + 5万IOPS优化)
# ============================================
# 缓冲池设置(16GB的70% = 11GB)
innodb_buffer_pool_size = 11G
innodb_buffer_pool_instances = 8
innodb_buffer_pool_chunk_size = 128M
# 重做日志配置
innodb_log_buffer_size = 128M
innodb_log_file_size = 2G
innodb_log_files_in_group = 2
# 刷新策略(导入时设为0最快,导入完改回2)
innodb_flush_log_at_trx_commit = 0
# 刷新方法(SSD推荐O_DIRECT)
innodb_flush_method = O_DIRECT
# 数据文件
innodb_data_file_path = ibdata1:1G:autoextend
innodb_autoextend_increment = 256
# I/O线程(8核CPU配置)
innodb_read_io_threads = 8
innodb_write_io_threads = 8
innodb_purge_threads = 4
innodb_page_cleaners = 4
# I/O容量(针对5万IOPS的硬盘)
innodb_io_capacity = 10000
innodb_io_capacity_max = 20000
# 脏页配置(导入时提高)
innodb_max_dirty_pages_pct = 90
innodb_max_dirty_pages_pct_lwm = 10
# SSD优化
innodb_flush_neighbors = 0
# 双写缓冲(导入时禁用,导入完启用)
innodb_doublewrite = OFF
# 锁配置
innodb_lock_wait_timeout = 300
innodb_rollback_on_timeout = ON
innodb_autoinc_lock_mode = 2
# 自适应哈希(导入时禁用)
innodb_adaptive_hash_index = OFF
# 变更缓冲(最大化)
innodb_change_buffering = all
innodb_change_buffer_max_size = 50
# Undo日志
innodb_undo_tablespaces = 3
innodb_undo_logs = 128
innodb_undo_log_truncate = ON
innodb_max_undo_log_size = 1G
# 并发控制(0=自动)
innodb_thread_concurrency = 0
innodb_concurrency_tickets = 5000
# 预读优化(导入时禁用预读)
innodb_read_ahead_threshold = 0
innodb_random_read_ahead = OFF
# 文件配置
innodb_file_per_table = ON
innodb_open_files = 4096
# 监控
innodb_print_all_deadlocks = OFF
# 同步优化
innodb_sync_array_size = 8
innodb_sync_spin_loops = 30
# LRU扫描深度
innodb_lru_scan_depth = 512
# 统计信息
innodb_stats_persistent = ON
innodb_stats_auto_recalc = ON
innodb_stats_persistent_sample_pages = 20
innodb_stats_on_metadata = OFF
# 日志压缩
innodb_log_compressed_pages = OFF
# 排序缓冲
innodb_sort_buffer_size = 64M
# 在线DDL
innodb_online_alter_log_max_size = 1G
# 缓冲池预加载
innodb_buffer_pool_dump_at_shutdown = ON
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_pct = 25
# 快速关闭
innodb_fast_shutdown = 1
# 严格模式(导入时关闭)
innodb_strict_mode = OFF
# ============================================
# 性能优化
# ============================================
performance_schema = OFF
symbolic-links = 0
skip-external-locking
open_files_limit = 65535
transaction_isolation = READ-COMMITTED
autocommit = ON
# 优化器
eq_range_index_dive_limit = 200
optimizer_switch = 'index_merge=on,index_merge_union=on,index_merge_sort_union=on,index_merge_intersection=on,engine_condition_pushdown=on,index_condition_pushdown=on,mrr=on,mrr_cost_based=on,block_nested_loop=on,batched_key_access=on,materialization=on,semijoin=on,loosescan=on,firstmatch=on,duplicateweedout=on,subquery_materialization_cost_based=on,use_index_extensions=on,condition_fanout_filter=on,derived_merge=on'
# 磁盘临时表引擎
internal_tmp_disk_storage_engine = InnoDB
# ============================================
# 导入专用设置
# ============================================
bulk_insert_buffer_size = 128M
delay_key_write = ALL
myisam_sort_buffer_size = 128M
myisam_max_sort_file_size = 10G
# ============================================
# 其他优化
# ============================================
max_connect_errors = 1000
max_prepared_stmt_count = 16382
thread_stack = 256K
max_sort_length = 1024
max_length_for_sort_data = 4096
lock_wait_timeout = 31536000
host_cache_size = 0
event_scheduler = OFF
secure_file_priv = ''
[mysqldump]
quick
quote-names
max_allowed_packet = 1G
default-character-set = utf8mb4
single-transaction
skip-lock-tables
[myisamchk]
key_buffer_size = 256M
sort_buffer_size = 256M
read_buffer = 128M
write_buffer = 128M
多进程导入脚本重构
在 MySQL 层完成优化后,单线程导入依然无法满足效率要求,因此对官方脚本进行了完全重构,核心目标是:
设计目标
- 多进程并行导入
- 自动识别结构 / 数据完整性
- 支持 SQL 与 CSV 混合格式
- 导入完成后自动清理源文件,释放磁盘空间
- 导入过程可视化、可控、可中断
脚本核心能力
- 预扫描备份目录并生成导入报告
- 自动识别以下几类表:
- 结构 + 数据完整(正常导入)
- 仅结构(空表)
- 有数据但缺失结构(严重错误,需人工处理)
- 按 CPU 核心数动态分配并发度
- 每个表作为最小调度单元,负载均衡
- 数据导入完成后自动删除 .sql / .csv 文件
并发策略
- 结构导入阶段:全核并行(耗时极短)
- 数据导入阶段:保留 2 个核心给 MySQL,其余全部用于导入任务
- 使用
imap_unordered确保无空闲进程
多线程导入脚本 (Python 3)
#!/usr/bin/python3
# ****************************************************************#
# ScriptName: auto_clean_restore.py (v6.0 CN + AutoClean)
# Function: 预检 -> 确认 -> 导入 -> 自动删除源文件
# ***************************************************************#
import os
import sys
import subprocess
import shutil
from multiprocessing import Pool, cpu_count
STRUCTURE_FILE_NAME = "structure.sql"
DATA_PATH_PREFIX = "data"
# --- 辅助函数: 生成 MySQL 命令 ---
def get_cmd(db_host, db_port, db_user, db_pass, db_name=None):
cmd = [
'mysql',
f'-h{db_host}', f'-P{db_port}', f'-u{db_user}', f'-p{db_pass}',
'--default-character-set=utf8mb4',
'--net_buffer_length=1M',
'--max_allowed_packet=1G'
]
if db_name:
cmd.append(f'-D{db_name}')
return cmd
# --- 扫描逻辑 ---
def scan_backup_directory(root_dir):
"""扫描目录并返回分类后的任务列表"""
valid_tasks = []
report = {
"total_tables": 0,
"ready_full": [], # 结构 + 数据 (完美)
"ready_struct": [], # 仅结构 (空表)
"missing_struct": [], # 有数据但无结构 (严重错误)
"useless": [] # 啥都没 (废弃目录)
}
# 排序以保证显示美观
db_dirs = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
for db in db_dirs:
db_path = os.path.join(root_dir, db)
tables = sorted([t for t in os.listdir(db_path) if os.path.isdir(os.path.join(db_path, t))])
for t in tables:
t_path = os.path.join(db_path, t)
report["total_tables"] += 1
identifier = f"{db}.{t}"
# 检查结构
struc_file = os.path.join(t_path, STRUCTURE_FILE_NAME)
has_struct = os.path.exists(struc_file) and os.path.getsize(struc_file) > 0
# 检查数据
data_dir = os.path.join(t_path, DATA_PATH_PREFIX)
has_data = False
if os.path.isdir(data_dir):
# 只统计非空的 sql 或 csv
valid_files = [f for f in os.listdir(data_dir)
if (f.endswith('.sql') or f.endswith('.csv'))
and os.path.getsize(os.path.join(data_dir, f)) > 0]
if valid_files:
has_data = True
# 分类
task_info = (db, t, t_path)
if has_struct and has_data:
report["ready_full"].append(identifier)
valid_tasks.append(task_info)
elif has_struct and not has_data:
report["ready_struct"].append(identifier)
valid_tasks.append(task_info) # 仅导入结构
elif not has_struct and has_data:
report["missing_struct"].append(identifier)
# 这种无法导入,不加入任务列表
else:
report["useless"].append(identifier)
return report, valid_tasks
# --- 工作进程 1: 导入结构 (带自动删除) ---
def restore_structure_worker(args):
db_name, table_name, table_path, db_host, db_port, db_user, db_pass = args
struc_file = os.path.join(table_path, STRUCTURE_FILE_NAME)
if not os.path.exists(struc_file): return True
try:
cmd = get_cmd(db_host, db_port, db_user, db_pass, db_name)
with open(struc_file, 'r', encoding='utf-8', errors='replace') as f:
res = subprocess.run(cmd, stdin=f, capture_output=True, text=True)
if res.returncode != 0:
if "already exists" in res.stderr:
# 表已存在也视为成功,尝试删除文件
pass
else:
raise Exception(res.stderr.strip())
# 【自动删除】导入成功,删除 structure.sql
try:
os.remove(struc_file)
except OSError:
pass
return True
except Exception as e:
print(f"[结构失败] `{db_name}`.`{table_name}`: {e}", flush=True)
return False
# --- 工作进程 2: 导入数据 (带自动删除) ---
def restore_data_worker(args):
db_name, table_name, table_path, db_host, db_port, db_user, db_pass = args
data_dir = os.path.join(table_path, DATA_PATH_PREFIX)
if not os.path.isdir(data_dir):
# 如果没有数据目录,尝试删除表目录(如果为空)清理垃圾
try: os.rmdir(table_path)
except: pass
return True
files = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if os.path.getsize(os.path.join(data_dir, f)) > 0])
if not files:
try: os.rmdir(data_dir)
except: pass
return True
is_csv = files[0].endswith('.csv')
process = None
try:
print(f"[数据导入-{os.getpid()}]: 🚀 开始 `{db_name}`.`{table_name}` (共 {len(files)} 个文件)", flush=True)
cmd = get_cmd(db_host, db_port, db_user, db_pass, db_name)
if is_csv: cmd.append('--local-infile=1')
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1024*1024)
# 快速检查连接
try:
process.wait(timeout=0.1)
out, err = process.communicate()
raise Exception(f"数据库连接断开: {err.strip()}")
except subprocess.TimeoutExpired:
pass
# --- 策略 A: SQL 流式导入 ---
if not is_csv:
process.stdin.write("SET unique_checks=0; SET foreign_key_checks=0; SET sql_log_bin=0; SET autocommit=0;\n")
for fpath in files:
with open(fpath, 'r', encoding='utf-8', errors='replace', buffering=1024*1024) as f:
shutil.copyfileobj(f, process.stdin)
# 每个文件提交一次
process.stdin.write("\nCOMMIT;\nSET autocommit=0;\n")
process.stdin.write("COMMIT;\n")
process.stdin.close()
# --- 策略 B: CSV 导入 ---
else:
csv_cmds = ["SET unique_checks=0; SET foreign_key_checks=0; SET sql_log_bin=0; SET autocommit=0;"]
schema = ""
parts = os.path.basename(files[0]).split('.')
if len(parts) > 1 and parts[-2] == "wh":
with open(files[0], 'r', encoding='utf-8') as f: schema = f.readline().strip()
for fpath in files:
path_str = os.path.abspath(fpath).replace('\\', '/')
load = f"LOAD DATA LOCAL INFILE '{path_str}' INTO TABLE `{table_name}` CHARACTER SET utf8mb4 FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\n'"
if schema: load += f" IGNORE 1 LINES ({schema})"
csv_cmds.append(load + "; COMMIT; SET autocommit=0;")
csv_cmds.append("COMMIT;")
out, err = process.communicate(input="\n".join(csv_cmds))
if process.returncode != 0: raise Exception(err)
# 等待完成
if process.poll() is None: process.wait()
if process.returncode != 0:
_, err = process.communicate()
raise Exception(err.strip())
# 【自动删除】导入成功,开始清理文件
for fpath in files:
try: os.remove(fpath)
except OSError: pass
# 尝试删除空的 data 目录
try: os.rmdir(data_dir)
except OSError: pass
# 尝试删除空的表目录 (如果结构文件和数据文件都删光了)
try: os.rmdir(table_path)
except OSError: pass
print(f"[数据完成-{os.getpid()}]: ✅ 已清理 `{db_name}`.`{table_name}`", flush=True)
return True
except Exception as e:
print(f"[数据失败] `{db_name}`.`{table_name}`: {e}", flush=True)
if process and process.poll() is None: process.terminate()
return False
# --- 主程序 ---
def main():
if len(sys.argv) != 6:
print("用法: python3 auto_clean_restore.py [目录] [主机] [端口] [用户] [密码]")
sys.exit(1)
root, host, port, user, password = os.path.abspath(sys.argv[1]), sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]
print(f"--- 🔍 阶段 0: 正在扫描备份目录: {root} ---")
print("文件扫描中... 请稍候...")
report, valid_tasks = scan_backup_directory(root)
# --- 打印报告 ---
print("\n" + "="*60)
print(f"📑 数据库导入预检报告 (共扫描表: {report['total_tables']})")
print("="*60)
# 1. 严重问题
if report['missing_struct']:
print(f"\n🚨 [严重错误] 缺失表结构 (有数据但无 CREATE TABLE):")
print(f" 数量: {len(report['missing_struct'])}")
print("-" * 30)
for item in report['missing_struct']:
print(f" ❌ {item}")
print("-" * 30)
print(" 👉 这些表无法自动恢复,需要人工处理!")
else:
print("\n✅ 未发现严重的表结构缺失问题。")
# 2. 空表
if report['ready_struct']:
print(f"\n⚠️ [提示] 仅结构 (空表):")
print(f" 数量: {len(report['ready_struct'])}")
if len(report['ready_struct']) < 10:
for item in report['ready_struct']: print(f" - {item}")
else:
print(f" - (列表太长,共 {len(report['ready_struct'])} 个空表)")
# 3. 正常任务
print(f"\n📦 [就绪] 准备全量导入 (结构 + 数据):")
print(f" 数量: {len(report['ready_full'])}")
# 4. 无用目录
if report['useless']:
print(f"\n🗑️ [忽略] 无结构且无数据 (废弃目录):")
print(f" 数量: {len(report['useless'])}")
print("="*60)
print(f"\n有效导入任务总数: {len(valid_tasks)}")
print("⚠️ 警告: 导入成功后,源文件 (.sql/.csv) 将被【永久删除】以释放空间!")
# --- 确认执行 ---
if not valid_tasks:
print("❌ 没有发现需要导入的表。程序退出。")
sys.exit(0)
user_input = input("\n🚀 是否确认开始导入并自动清理?(输入 y 确认): ").strip().lower()
if user_input != 'y':
print("⛔️ 用户取消操作。")
sys.exit(0)
# --- 开始执行 ---
# 准备参数
restore_args = [ (t[0], t[1], t[2], host, port, user, password) for t in valid_tasks ]
# 阶段 1: 创建数据库
print(f"\n>>> [阶段 1] 正在创建数据库...")
unique_dbs = set([t[0] for t in valid_tasks])
for db in unique_dbs:
subprocess.run(get_cmd(host, port, user, password), input=f"CREATE DATABASE IF NOT EXISTS `{db}`;", text=True, stderr=subprocess.DEVNULL)
# 阶段 2: 导入结构
print(f"\n>>> [阶段 2] 并发导入表结构...")
# 结构导入很快,使用全部核心
with Pool(cpu_count()) as pool:
pool.map(restore_structure_worker, restore_args)
# 阶段 3: 导入数据
print(f"\n>>> [阶段 3] 并发导入数据 (完成后自动删除源文件)...")
# 数据导入较重,保留 2 个核心给数据库
workers = max(1, cpu_count() - 2)
print(f"启动 {workers} 个并发进程 (负载均衡模式)...")
with Pool(workers) as pool:
# chunksize=1 确保每次只领 1 个表,干完立刻去领下一个,绝不闲置
list(pool.imap_unordered(restore_data_worker, restore_args, chunksize=1))
print("\n--- 🎉 全部完成!源文件已清理 ---")
if __name__ == '__main__':
main()
实际效果
在相同硬件条件下,对比结果如下:
| 项目 | 官方脚本 | 优化后 |
|---|---|---|
| CPU 利用率 | < 20% | 80%+ |
| IO 吞吐 | 极低 | 接近磁盘上限 |
| 导入速度 | 几 GB / 小时 | 数十 GB / 小时 |
| 磁盘占用 | 持续增长 | 实时释放 |
对于超大规模快照(数百 GB 以上),该方案可以显著缩短恢复时间,并避免因磁盘空间耗尽而中断任务。
总结
恢复超大规模数据库快照,性能瓶颈往往不在 SQL,而在配置与并发模型。
