代码拉取完成,页面将自动刷新
# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
给数据库里面的表统一添加version字段
"""
import datetime
import logging
import os
import sys
import pymysql
from sshtunnel import SSHTunnelForwarder
# 如果需要ssh方式连接数据库,则需要下面这个
# server = SSHTunnelForwarder(("192.168.0.2", 22),ssh_password="Pg123456!@#$%^ping",ssh_username='root',remote_bind_address=("pc-2zeyi062q7v17kb07.rwlb.rds.aliyuncs.com", 3306))
class AddTableFiled():
def __init__(self):
# server.start()
# 设置日志
self.set_logging()
# 数据库名称
self.database_name = 'edu_zlpg'
# 数据库用户名
user = 'admin268xue'
# 数据库密码
passwd = 'admin268xue'
# 数据库连接,如果是ssh方式则用这个
# self.conn = pymysql.connect(host='127.0.0.1', user=user, passwd=passwd,
# db=self.database_name,
# port=server.local_bind_port, charset='utf8',
# cursorclass=pymysql.cursors.DictCursor,
# connect_timeout=7200)
# self.cursor = self.conn.cursor()
self.conn = pymysql.connect(host='192.168.0.3', user=user, passwd=passwd,
db=self.database_name,
port=3306, charset='utf8',
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
self.conn.autocommit(False)
self.cursor_default = self.conn.cursor()
self.table_names = []
# 设置日志
def set_logging(self) -> None:
today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(
# 控制台打印的日志级别
level=logging.INFO,
# 日志格式
format='%(asctime)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s',
# 日志文件名
filename=os.path.join(os.getcwd(), 'add_table_field' + '-' + today + '.txt'),
# 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志, a是追加模式,默认如果不写的话,就是追加模式
filemode='a'
)
def get_table_names(self):
# 先设置为空
self.table_names = []
select_all_table_names_sql = f"""
select TABLE_NAME from information_schema.tables where table_schema='{self.database_name}' and table_type='BASE TABLE'
"""
self.cursor_default.execute(select_all_table_names_sql)
all_table_names = self.cursor_default.fetchall()
if all_table_names:
for table_name_dict in all_table_names:
table_name = table_name_dict["TABLE_NAME"]
if 'act_' not in table_name:
self.table_names.append(table_name)
def add_fields(self):
for table_name in self.table_names:
self.add_field(table_name)
def add_field(self, table_name: str):
check_column_sql = f"""
SELECT * FROM information_schema.columns WHERE TABLE_SCHEMA = '{self.database_name}'
and TABLE_NAME = '{table_name}' AND COLUMN_NAME = 'version'
"""
self.cursor_default.execute(check_column_sql)
column_list = self.cursor_default.fetchall()
if not column_list or len(column_list) == 0:
add_column_sql = f"""
ALTER TABLE {table_name} ADD version INTEGER UNSIGNED DEFAULT 0 NULL;
"""
self.cursor_default.execute(add_column_sql)
if __name__ == "__main__":
add_table_field = AddTableFiled()
add_table_field.get_table_names()
add_table_field.add_fields()
# 数据库连接提交
add_table_field.conn.commit()
# 关闭游标
add_table_field.cursor_default.close()
# 关闭数据库连接
add_table_field.conn.close()
# server.close()
# 退出程序
sys.exit(0)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。