1 Star 0 Fork 0

fuwu360/fuwu360-tools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
add_table_field.py 4.08 KB
一键复制 编辑 原始数据 按行查看 历史
# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
给数据库里面的表统一添加version字段
"""
import datetime
import logging
import os
import sys
import pymysql
from sshtunnel import SSHTunnelForwarder
# 如果需要ssh方式连接数据库,则需要下面这个
# server = SSHTunnelForwarder(("192.168.0.2", 22),ssh_password="Pg123456!@#$%^ping",ssh_username='root',remote_bind_address=("pc-2zeyi062q7v17kb07.rwlb.rds.aliyuncs.com", 3306))
class AddTableFiled():
def __init__(self):
# server.start()
# 设置日志
self.set_logging()
# 数据库名称
self.database_name = 'edu_zlpg'
# 数据库用户名
user = 'admin268xue'
# 数据库密码
passwd = 'admin268xue'
# 数据库连接,如果是ssh方式则用这个
# self.conn = pymysql.connect(host='127.0.0.1', user=user, passwd=passwd,
# db=self.database_name,
# port=server.local_bind_port, charset='utf8',
# cursorclass=pymysql.cursors.DictCursor,
# connect_timeout=7200)
# self.cursor = self.conn.cursor()
self.conn = pymysql.connect(host='192.168.0.3', user=user, passwd=passwd,
db=self.database_name,
port=3306, charset='utf8',
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
self.conn.autocommit(False)
self.cursor_default = self.conn.cursor()
self.table_names = []
# 设置日志
def set_logging(self) -> None:
today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(
# 控制台打印的日志级别
level=logging.INFO,
# 日志格式
format='%(asctime)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s',
# 日志文件名
filename=os.path.join(os.getcwd(), 'add_table_field' + '-' + today + '.txt'),
# 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志, a是追加模式,默认如果不写的话,就是追加模式
filemode='a'
)
def get_table_names(self):
# 先设置为空
self.table_names = []
select_all_table_names_sql = f"""
select TABLE_NAME from information_schema.tables where table_schema='{self.database_name}' and table_type='BASE TABLE'
"""
self.cursor_default.execute(select_all_table_names_sql)
all_table_names = self.cursor_default.fetchall()
if all_table_names:
for table_name_dict in all_table_names:
table_name = table_name_dict["TABLE_NAME"]
if 'act_' not in table_name:
self.table_names.append(table_name)
def add_fields(self):
for table_name in self.table_names:
self.add_field(table_name)
def add_field(self, table_name: str):
check_column_sql = f"""
SELECT * FROM information_schema.columns WHERE TABLE_SCHEMA = '{self.database_name}'
and TABLE_NAME = '{table_name}' AND COLUMN_NAME = 'version'
"""
self.cursor_default.execute(check_column_sql)
column_list = self.cursor_default.fetchall()
if not column_list or len(column_list) == 0:
add_column_sql = f"""
ALTER TABLE {table_name} ADD version INTEGER UNSIGNED DEFAULT 0 NULL;
"""
self.cursor_default.execute(add_column_sql)
if __name__ == "__main__":
add_table_field = AddTableFiled()
add_table_field.get_table_names()
add_table_field.add_fields()
# 数据库连接提交
add_table_field.conn.commit()
# 关闭游标
add_table_field.cursor_default.close()
# 关闭数据库连接
add_table_field.conn.close()
# server.close()
# 退出程序
sys.exit(0)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/fuwu360/fuwu360-tools.git
git@gitee.com:fuwu360/fuwu360-tools.git
fuwu360
fuwu360-tools
fuwu360-tools
master

搜索帮助