代码拉取完成,页面将自动刷新
同步操作将从 春秋/DBAS 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/bin/env python3
# coding:utf-8
import time
import random
import os
from mysql import connector
import loadconfig
__author__ = 'MC'
class BackupMyDb:
"""
MYSQL数据库备份
"""
def __init__(self, Host:'Host Address'='localhost', User:'UserName'='root', Password:'Password'='password', DbName:'DbName'='test', Encoding='utf8', Port=3306):
"""
建立MYSQL连接
实例化cursor至self.Cursor
:param Host:
:param User:
:param Password:
:param DbName:
:param Encoding:
:return:
"""
self.DbName = DbName
self.Host = Host
self.EnCoding = 'utf-8'
try:
self.DbRes = connector.connect(host=Host, user=User, password=Password, database=DbName, charset=Encoding, port=Port)
self.Cursor = self.DbRes.cursor(dictionary = True)
except Exception:
print("链接数据库失败,请检查参数完整性")
return
def Backup(self, FilePath: "Abs Path"='None'):
"""
执行备份主入口
:param FilePath:
:return:
"""
if FilePath == 'None':
FilePath = [loadconfig.BackupFilePath + time.strftime("%Y%m%d") + '/'
if loadconfig.BackupFilePath[-1] == '/'
else loadconfig.BackupFilePath + '/' + time.strftime("%Y%m%d") + '/']
self.FilePath = FilePath[0].strip()
self.Get_Table_List()
File = self.Touch_Sql_File(self.FilePath)
for Table_Dict in self.TableList:
TableName = Table_Dict['Tables_in_{0}'.format(self.DbName)]
CreateTable = self.Get_Table_Construct(TableName)
File.write("-- Table:{0}\r\n".format(TableName))
File.write("{0};\r\n".format(str(CreateTable[0]['Create Table'])))
QueryList = self.Get_Insert(TableName)
for Line in QueryList:
File.write("{0}\r\n".format(Line))
File.close()
self.Close()
print("备份数据库成功")
print("备份目录:{0}".format(FilePath))
print("文件名称:{0}".format(self.FileName))
return True
def Touch_Sql_File(self, FilePath: 'Relative path'):
self.FileName = "Backup_{0}_{1}_{2}.sql".format(self.DbName, time.strftime("%Y%m%d%H%M%S"), random.randrange(100, 1000))
if not os.path.exists(FilePath):
os.makedirs(FilePath)
File = open("{0}/{1}".format(FilePath, self.FileName), 'a', encoding=self.EnCoding)
File.write("-- Source Host: {0}\r\n".format(self.Host))
File.write("-- Source Database: {0}\r\n".format(self.DbName))
File.write("-- Time: {0}\r\n".format(time.strftime("%Y-%m-%d %H-%M-%S")))
File.write("-- User: DAS\r\n")
File.write("\r\n")
return File
def Get_Table_List(self):
"""
获取数据库中所有表,并添加进self.TableList
:return:
"""
self.Cursor.execute("show tables")
self.TableList = self.Cursor.fetchall()
# print(self.TableList)
def Get_Table_Construct(self, TableName):
"""
查询表的创建语句
:param TableName:
:return:
"""
self.Cursor.execute("show create table {0}".format(TableName))
return self.Cursor.fetchall()
def Get_Insert(self, TableName):
"""
查询并组装表中数据的插入语句
:param TableName:
:return:
"""
self.Cursor.execute("select * from {0}".format(TableName))
List = []
ByteArray = bytearray("字节数组", self.EnCoding)
ByteString = bytes("字节字符串", self.EnCoding)
for Line in self.Cursor.fetchall():
Query = "Insert Into `{0}`(".format(TableName)
for Name in Line.keys():
Query += "`" + Name + "`,"
Query = Query[:-1]
Query += ") value("
for Value in Line.values():
if type(Value) == type(ByteArray):
Val = str(Value.decode(self.EnCoding))
elif type(Value) == type(ByteString):
Val = str(Value.decode(self.EnCoding))
else:
Val = str(Value)
Query += "'" + Val + "',"
Query = Query[:-1]
Query += ");"
List.append(Query)
return List
def Close(self):
# 关闭数据库连接
self.Cursor.close()
self.DbRes.close()
def Test(self):
print("成功链接数据库")
self.Close()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。