mysql数据库迁移

Sabthever

mysql数据库从一个迁移到另一个,有的表不会新建,没有的表才会新建并传数据,库需要提前先建好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pymysql
import sys
import time
from tqdm import tqdm

class MySQLMigrator:
def __init__(self, source_config, target_config):
self.source_config = source_config
self.target_config = target_config
self.source_conn = None
self.target_conn = None

def connect_databases(self):
"""连接源数据库和目标数据库(优化逻辑,降低权限要求)"""
try:
# 1. 连接源库
self.source_conn = pymysql.connect(**self.source_config)
print("源数据库连接成功")

# 2. 尝试直接连接目标业务数据库
try:
self.target_conn = pymysql.connect(**self.target_config)
print(f"成功连接到目标业务数据库 `{self.target_config['database']}`")

except pymysql.err.OperationalError as e:
# 捕获操作错误,常见错误码如1049表示数据库不存在
error_code, error_msg = e.args
if error_code == 1049: # Unknown database
print(f"目标数据库 `{self.target_config['database']}` 不存在,尝试创建...")
self._create_target_database_via_mysql()
else:
# 如果是其他连接错误(如权限、网络问题),直接抛出
raise e

except Exception as e:
raise Exception(f"数据库连接失败: {str(e)}")

def _create_target_database_via_mysql(self):
"""通过连接'mysql'系统库来创建目标数据库"""
try:
# 临时配置,连接'mysql'系统库
temp_target_config = self.target_config.copy()
temp_target_config['database'] = 'mysql' # 临时连接到一个存在的库

# 尝试连接
temp_conn = pymysql.connect(**temp_target_config)
with temp_conn.cursor() as cursor:
# 获取源库的字符集用于创建
charset, collation = self.get_database_charset()
# 创建数据库
cursor.execute(
f"CREATE DATABASE IF NOT EXISTS `{self.target_config['database']}` "
f"CHARACTER SET {charset} COLLATE {collation}"
)
temp_conn.commit()
temp_conn.close()
print(f"目标数据库 `{self.target_config['database']}` 创建成功。")

# 创建成功后,再次尝试连接目标业务数据库
self.target_conn = pymysql.connect(**self.target_config)
print(f"成功连接到新创建的目标数据库 `{self.target_config['database']}`")

except pymysql.err.OperationalError as e:
error_code, error_msg = e.args
if error_code == 1044: # Access denied
# 提供更清晰的错误提示
raise Exception(
f"无法创建目标数据库。当前用户 `{self.target_config['user']}` 没有创建数据库的权限。\n"
f"请先使用管理员账户(如root)在目标MySQL服务器上手动创建数据库 `{self.target_config['database']}`,\n"
f"或者授予当前用户创建数据库的权限(如 GRANT CREATE ON *.* TO ...)。"
)
else:
raise e
except Exception as e:
raise Exception(f"通过'mysql'系统库创建目标数据库失败: {str(e)}")

def get_database_charset(self):
"""获取源数据库的字符集和排序规则"""
try:
with self.source_conn.cursor() as cursor:
cursor.execute(
"SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME "
"FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = %s",
(self.source_config['database'],)
)
result = cursor.fetchone()
return result[0], result[1]
except Exception as e:
raise Exception(f"获取数据库字符集失败: {str(e)}")

def get_all_tables(self):
"""获取源数据库中的所有表名"""
try:
with self.source_conn.cursor() as cursor:
cursor.execute(
"SELECT TABLE_NAME FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA = %s AND TABLE_TYPE = 'BASE TABLE'",
(self.source_config['database'],)
)
tables = [row[0] for row in cursor.fetchall()]
return tables
except Exception as e:
raise Exception(f"获取表列表失败: {str(e)}")

def table_exists_in_target(self, table_name):
"""检查表是否已存在于目标数据库"""
try:
with self.target_conn.cursor() as cursor:
cursor.execute(
"SELECT COUNT(*) FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
(self.target_config['database'], table_name)
)
return cursor.fetchone()[0] > 0
except Exception as e:
raise Exception(f"检查表存在性失败: {str(e)}")

def get_table_creation_order(self):
"""获取基于外键依赖关系的正确表创建顺序"""
try:
with self.source_conn.cursor() as cursor:
# 查询外键依赖关系
cursor.execute("""
SELECT
TABLE_NAME,
REFERENCED_TABLE_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE CONSTRAINT_SCHEMA = %s
AND REFERENCED_TABLE_NAME IS NOT NULL
""", (self.source_config['database'],))

dependencies = {}
# 构建依赖关系字典
for table, referenced_table in cursor.fetchall():
if table not in dependencies:
dependencies[table] = []
dependencies[table].append(referenced_table)

# 对表进行拓扑排序
visited = {}
creation_order = []

def visit(table_name):
if table_name in visited:
if visited[table_name] == 1: # 发现循环依赖
raise Exception(f"检测到循环外键依赖,涉及表: {table_name}")
return
visited[table_name] = 1 # 标记为正在访问

# 先访问所有依赖的表
if table_name in dependencies:
for ref_table in dependencies[table_name]:
visit(ref_table)

visited[table_name] = 2 # 标记为已访问
if table_name not in creation_order:
creation_order.append(table_name)

# 获取所有表
all_tables = self.get_all_tables()
for table in all_tables:
visit(table)

return creation_order

except Exception as e:
raise Exception(f"分析表依赖关系失败: {str(e)}")

def migrate_table_structure(self, table_name):
"""迁移表结构(包括索引、约束、注释等)"""
try:
with self.source_conn.cursor() as source_cursor:
# 获取表创建语句
source_cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
create_table_sql = source_cursor.fetchone()[1]

# 在目标数据库创建表
with self.target_conn.cursor() as target_cursor:
target_cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
target_cursor.execute(create_table_sql)

self.target_conn.commit()
except Exception as e:
raise Exception(f"迁移表结构失败: {str(e)}")

def migrate_table_data(self, table_name, batch_size=1000):
"""迁移表数据(分批处理,避免内存溢出),返回实际迁移的行数[3](@ref)"""
try:
with self.source_conn.cursor() as source_cursor:
# 获取表的所有列名
source_cursor.execute(f"SELECT COLUMN_NAME FROM information_schema.COLUMNS "
f"WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s "
f"ORDER BY ORDINAL_POSITION",
(self.source_config['database'], table_name))
columns = [row[0] for row in source_cursor.fetchall()]
columns_str = ', '.join([f"`{col}`" for col in columns])
placeholders = ', '.join(['%s'] * len(columns))

# 获取总行数用于进度跟踪[3](@ref)
source_cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`")
total_rows = source_cursor.fetchone()[0]

# 分批迁移数据
offset = 0
actual_rows_migrated = 0 # 实际迁移行数计数器
with self.target_conn.cursor() as target_cursor:
while offset < total_rows:
source_cursor.execute(f"SELECT {columns_str} FROM `{table_name}` "
f"LIMIT %s OFFSET %s", (batch_size, offset))
batch_data = source_cursor.fetchall()

if not batch_data:
break

# 批量插入数据
insert_sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
target_cursor.executemany(insert_sql, batch_data)
self.target_conn.commit()

actual_rows_migrated += len(batch_data)
offset += len(batch_data)

return actual_rows_migrated # 返回实际插入的行数
except Exception as e:
raise Exception(f"迁移表数据失败: {str(e)}")

def migrate_database(self):
"""执行完整的数据库迁移(基于数据行数的平滑进度条)"""
try:
self.connect_databases()

# 获取所有表并按依赖关系排序
all_tables = self.get_all_tables()
tables_to_migrate = [t for t in all_tables if not self.table_exists_in_target(t)]

if not tables_to_migrate:
print("没有需要迁移的表")
return

# 获取正确的表创建顺序
creation_order = self.get_table_creation_order()
tables_to_migrate_ordered = [t for t in creation_order if t in tables_to_migrate]

print(f"需要迁移的表数量: {len(tables_to_migrate_ordered)}")
print("正在计算总数据量...")

# === 核心优化:预计算所有待迁移表的总行数 ===
total_rows_to_migrate = 0
table_row_counts = {} # 存储每张表的行数

with self.source_conn.cursor() as count_cursor:
for table_name in tables_to_migrate_ordered:
count_cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`")
table_row_count = count_cursor.fetchone()[0]
table_row_counts[table_name] = table_row_count
total_rows_to_migrate += table_row_count
print(f" 表 `{table_name}`: {table_row_count} 行")

print(f"预估总数据行数: {total_rows_to_migrate}")
print("-" * 50)

# 禁用外键检查以提高性能[5](@ref)
with self.target_conn.cursor() as cursor:
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
self.target_conn.commit()

# 先迁移所有表结构
print("正在迁移表结构...", end="", flush=True)
for table_name in tables_to_migrate_ordered:
self.migrate_table_structure(table_name)
print(" 完成")

# === 核心优化:使用基于行数的进度条 ===
migrated_rows_so_far = 0 # 全局已迁移行数计数器
start_time = time.time()

# 创建tqdm进度条,总长度为总行数
with tqdm(total=total_rows_to_migrate, desc="总体进度", unit='row', unit_scale=True, ncols=100) as pbar:
for table_name in tables_to_migrate_ordered:
# 更新进度条描述,显示当前正在迁移的表名和其行数
pbar.set_description(f"正在迁移: {table_name} ({table_row_counts[table_name]}行)")

try:
# 迁移表数据,并获取该表实际迁移的行数
rows_migrated_this_table = self.migrate_table_data(table_name)

# 更新全局计数器
migrated_rows_so_far += rows_migrated_this_table

# 更新进度条(前进此表迁移的行数)
pbar.update(rows_migrated_this_table)

# 在上方打印单表完成信息(使用tqdm.write避免干扰进度条)
tqdm.write(f"✅ 表 `{table_name}` 迁移完成 ({rows_migrated_this_table}/{table_row_counts[table_name]} 行)")

except Exception as e:
# 清空状态行再显示错误
print("\r" + " " * 100 + "\r", end="", flush=True)
tqdm.write(f"❌ 迁移表 `{table_name}` 时出错: {str(e)}")
raise

# 重新启用外键检查
with self.target_conn.cursor() as cursor:
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
self.target_conn.commit()

# 显示最终统计信息
print("-" * 50)
total_time = time.time() - start_time
print(f"🎉 数据库迁移完成!")
if total_rows_to_migrate > 0:
print(f"📊 统计信息: 共迁移 {len(tables_to_migrate_ordered)} 张表,{total_rows_to_migrate} 行数据,耗时 {total_time:.2f} 秒")
print(f"📈 平均速率: {total_rows_to_migrate / total_time:.2f} 行/秒")

except Exception as e:
# 清空状态行再显示错误
print("\r" + " " * 100 + "\r", end="", flush=True)
print(f"❌ 迁移过程中出错: {str(e)}")
# 确保出错时重新启用外键检查
try:
if self.target_conn:
with self.target_conn.cursor() as cursor:
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
self.target_conn.commit()
except:
pass
sys.exit(1)
finally:
if self.source_conn:
self.source_conn.close()
if self.target_conn:
self.target_conn.close()

# 配置示例
if __name__ == "__main__":
# -- 把 查一下数据集
# SELECT default_character_set_name AS charset,
# default_collation_name AS collation
# FROM information_schema.schemata
# WHERE schema_name = 'ry_vue';

# # 指南针生产数据库配置(请根据实际情况修改)
source_config = {
'host': 'ip',
'port': 3306,
'user': '',
'password': '',
'database': ',
'charset': 'utf8mb4'
}

target_config = {
'host': 'ip',
'port': 3306,
'user': '',
'password': '',
'database': '',
'charset': 'utf8mb4'
}

# 执行迁移
migrator = MySQLMigrator(source_config, target_config)
migrator.migrate_database()
  • 标题: mysql数据库迁移
  • 作者: Sabthever
  • 创建于 : 2025-11-17 14:30:09
  • 更新于 : 2025-11-17 14:33:56
  • 链接: https://sabthever.cn/2025/11/17/technology/operation/mysql数据库迁移/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
目录
mysql数据库迁移