1 Star 0 Fork 2

翟成龙/powersigner_import

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
public.py 27.77 KB
一键复制 编辑 原始数据 按行查看 历史
magic1314 提交于 2024-05-24 10:33 . 初始化
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
# encoding:utf-8
"""
-------------------------- 脚本说明 --------------------------
@ author: magic
@ version: v2.1
@ create date: long time ago
@ usage: windows10:在系统上添加环境变量PYTHONPATH,值为本文件的目录
linux:用户:当前用户修改【 ~/.bash_profile 或 ~/.bashrc 】,所有用户修改【/etc/profile】文件
在文件最后面添加 【export PYTHONPATH=当前文件的目录】
@ purpose: 方便调用统一的方法
*********************** 更新记录 ***********************
@2021-04-27: 脚本放到linux系统上运行
@2021-04-28: 更新run_sql相关的函数内容
@2021-07-07: 增加对SQL中,# --和/**/3种备注以及连续换行符的删除
@2021-09-01: run_sql函数中,数据库配置文件没有加.txt,会在变量后面添加
@2021-10-21: write_log函数增加设置默认字符集,linux用gb2312,其他系统用utf-8
@2021-12-07: python代码报错,会通知到钉钉群,代码会继续执行(未实现)
@2021-12-27: 增加删除临时数据文件的函数(delete_his_file)
@2022-05-11: 预警默认放到58群,即[BI-ETL数据异常提醒群]群
@2022-09-19: run_sql函数中添加group_concat_max_len自定义参数
@2023-03-07: 剔除掉dingding模块
@2023-06-13: PATHONPATH 设置为【E:\project\share_function】,不生效的话,需要重启电脑;dingding模块一个遗漏的内容删除
"""
import os
import sys
import time
import shutil
import random
import re
import filecmp
import pymysql
import datetime
import traceback
from pymysql.constants import CLIENT
from properties import Properties
# if sys.platform == 'linux':
# sys.path.append("/home/dev_admin/script/share_function")
# import dingding
# 基本变量定义
# 调用脚本自身的一些参数,并本文件的目录
prog_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
work_dir = prog_dir + os.path.sep + 'work' + os.path.sep
log_dir = prog_dir + os.path.sep + 'logs' + os.path.sep
log_file = os.path.split(sys.argv[0])[-1].split('.')[0]
# 写日志的基础方法
# 输入参数: info->日志内容, tip->日志内容头部(默认为[info]),write_flag->是否写日志(默认为写),
# log_obj_dir->日志路径(最好自己定义), display->是否显示日志内容(默认显示)
# 输出为日志文件和终端显示
def write_log(info, tip='INFO', write_flag=True, log_obj_dir=log_dir, end='\n', display=True):
now_date = time.strftime("%Y%m%d", time.localtime())
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
hour = time.strftime("%H", time.localtime())
write_info = '[%s] [%s] ' % (now_time, tip) + str(info)
abs_log_file = log_obj_dir + log_file + '_' + str(now_date) + '_' + str(hour) + '.log'
# 为qt窗口程序提供输出数据(暂时置为无效状态)
# abs_log_file_new = log_obj_dir + log_file + '_' + str(now_date) + '_new' + '.log'
# 字符集设置
if sys.platform == 'linux':
default_encoding = 'gb18030'
else:
default_encoding = 'utf-8'
# 是否写日志,默认为写
if write_flag:
if not os.path.exists(log_obj_dir):
print('日志目录不存在,创建: 【%s】' % log_obj_dir)
os.mkdir(log_obj_dir)
with open(abs_log_file, 'a', encoding=default_encoding) as logs:
logs.write(write_info + '\n')
# with open(abs_log_file_new, 'a') as logs_new:
# logs_new.write(write_info + '\n')
# 打印日志内容到终端
if display:
print(write_info, end=end)
# max_num = 100000
# if random.randint(1, max_num+1) == 1:
# write_log('################## 十万分之一概率删除日志文件 ##################')
# ret = DeleteFile(r'E:\pyCharm\WEIBO\logs').delete() # 当前目录
# print(ret)
# 如果是错误,退出程序
if tip[:3].upper() == 'ERR':
if sys.platform == 'linux':
pass
sys.exit(88)
# 获得参数
# prog_dir:当前程序的目录 E:\Anaconda3
# get_parameter('prog_dir')
def get_parameter(para):
now_date = time.strftime("%Y%m%d", time.localtime())
abs_log_file_new = log_dir + log_file + '_' + str(now_date) + '_new' + '.log'
if para == 'prog_dir':
return prog_dir
elif para == 'now_date':
return now_date
elif para == 'now_date2':
now_date2 = time.strftime("%Y-%m-%d", time.localtime())
return now_date2
elif para == 'now_time':
now_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
return now_time
elif para == 'beatiful_now_time':
now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
return now_time
elif para == 'now_time2':
now_time2 = time.strftime("%H:%M:%S", time.localtime())
return now_time2
elif para == 'log_dir':
return log_dir
elif para == 'abs_log_file_new':
return abs_log_file_new
elif para == 'YYYYMMDD':
return now_date
elif para == 'YYYYMM':
return now_date[0:6]
elif para == 'YYYY':
return now_date[0:4]
else:
return ''
# 获得传入参数的类型
def get_type(input_data, display=False):
input_data = str(type(input_data))
if display:
print(input_data)
if re.match('<class.*>', input_data):
result = input_data[8:-2]
return result
# print(result)
else:
return None
# nvl函数
# 返回第一个不为假的值
def nvl(str_content, default_value=''):
if str_content:
return str_content
else:
return default_value
# 正则表达式替换字符串内容
def replace_plus(str_content, replace_grep, replace_content):
pattern = re.compile(replace_grep)
# print(pattern.findall(str_content))
out = re.sub(pattern, replace_content, str_content)
return out
# 用户帮助
def get_help(func, command, attention=0, other=0):
print("**********************************************")
print("*** 【帮助内容如下:】")
print("*** 核心功能: " + func)
print("*** 使用方法: " + command)
if attention:
print("*** 注意事项: " + str(attention))
if other:
print("*** 其他: " + str(other))
print("**********************************************")
exit(-99)
# 删除单个文件
def del_file(file_name, display=False):
if os.path.isfile(file_name):
os.remove(file_name)
if display:
print(file_name, 'is deleted')
# 单个文件复制
# is_force_delete: 默认不强制删除
def copy_file(source_file, target_file, is_force_delete=False, display=False):
if display:
write_log("开始执行:将文件【{0}】覆盖到文件【{1}】".format(source_file, target_file))
if os.path.exists(target_file):
# 比较两个文件是否相同
if filecmp.cmp(source_file, target_file):
if display:
write_log("文件相同,无需覆盖(跳过)")
return True
elif is_force_delete:
os.remove(target_file)
else:
write_log("目标文件【%s】已存在,且sync_cfg配置中设置不能强制删除, 终止" % target_file, tip='ERR')
else:
shutil.copy(source_file, target_file)
if display:
write_log("执行顺利完成!")
# 删除目录(包括其所有的文件)
# 输入参数: 绝对路径名称, 是否显示标志(默认打印)
# 输出终端显示,删除对应的目录
def del_catalog(absolute_path, display=True):
if not os.path.isabs(absolute_path):
print('该路径不是绝对路径,不删除!')
return 0
if os.path.isdir(absolute_path):
if display:
print("清理<%s>目录以及所有文件和目录!!" % absolute_path)
shutil.rmtree(absolute_path)
# 文件夹的拷贝
# is_force_delete: 默认不强制删除
def copy_catalog(source_catalog, target_catalog, is_force_delete=False, display=False):
if display:
write_log("开始执行:将文件夹【{0}】覆盖到文件夹【{1}】".format(source_catalog, target_catalog))
if os.path.exists(source_catalog):
# 如果目标路径存在原文件夹的话就先删除
if os.path.exists(target_catalog):
if is_force_delete:
shutil.rmtree(target_catalog)
else:
write_log("目标文件夹【%s】已存在,且sync_cfg配置中设置不能强制删除, 终止" % target_catalog, tip='ERR')
shutil.copytree(source_catalog, target_catalog)
if display:
write_log("执行顺利完成!")
else:
write_log("源目录【%s】不存在, 终止!" % source_catalog, tip='ERR')
# 写数据文件
# 输入: 数据内容, 文件名(默认当目录下的tmp.txt文件), 写入方式(默认为w,可以改为a追加), 打印方法(默认不打印)
# 输出为一个数据文件
def write_str(info, file_name='tmp.txt', write_type='w', encoding='gb18030', display=False):
if file_name == 'tmp.txt':
file_name = work_dir + file_name
if display:
print('正在往文件中写入数据: ' + file_name)
with open(file_name, write_type, encoding=encoding) as file:
file.write(str(info))
if display:
print('写入数据完成,文件名为:' + file_name)
# 读数据文件
# 输入: 文件名(默认当目录下的tmp.txt文件)
# 输出最后一行的内容????需要改进
def read_str(file_name='tmp.txt'):
file_name = work_dir + file_name
with open(file_name) as file:
for f in file:
return f
# 删除历史数据文件
# 输入: del_file_path需要文件名(绝对路径), delete_days需要保留的天数
# 输出: 删除掉临时数据文件
def delete_his_file(del_file_path, retention_days):
f = list(os.listdir(del_file_path))
write_log("目录【%s】下,开始清理%s天前的数据!" % (del_file_path, retention_days))
# 已删除文件个数
delete_file_count = 0
for i in range(len(f)):
filedate = os.path.getmtime(del_file_path + f[i])
time1 = datetime.datetime.fromtimestamp(filedate).strftime('%Y-%m-%d')
date1 = time.time()
num1 =(date1 - filedate)/60/60/24
if num1 >= retention_days:
try:
#os.remove(del_file_path + f[i])
del_file(del_file_path + f[i], display=True)
delete_file_count += 1
except Exception as e:
error(e)
write_log("@@@@@ 本次共删除文件个数:%s" % delete_file_count)
# 打乱字典内容
def shuffle_dict(dict_name):
keys = list(dict_name.keys())
random.shuffle(keys)
new_dict = dict()
for key in keys:
for ele in dict_name:
if key == ele:
new_dict[key] = dict_name[ele]
return new_dict
# 生成随机数
# 如果长度为32位,默认字符以当前日期开头,比如:202005171050334xDIoM0fMROU9fVncI
def generate_random_str(randomlength=16):
"""
  生成一个指定长度的随机字符串
  """
random_str = ''
base_str = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
length = len(base_str) - 1
if randomlength == 32:
for i in range(randomlength - 14):
random_str += base_str[random.randint(0, length)]
random_str = get_parameter('now_time') + random_str
else:
for i in range(randomlength):
random_str += base_str[random.randint(0, length)]
return random_str
# 显示颜色
def print_color(str_content, color='0'):
if color == 'red':
color = 31
elif color == 'green':
color = 32
elif color == 'yellow':
color = 33
elif color == 'blue':
color == 34
else:
color = 0
result_color = '\033[0;{0};0m'.format(color)+str(str_content)+'\033[0m'
print(result_color)
# print('\033[0;32;0m欢迎使用学生选课系统\033[0m')
return True
# 显示错误信息, 放在except中
def display_err_info():
exType, exValue, exTrace = sys.exc_info()
# print(traceback.print_tb(exTrace))
print_color("Traceback (most recent call last):", color='red')
for trace in traceback.extract_tb(exTrace):
print_color("\t" + str(trace), color='red')
print_color(str(exType) + " " + str(exValue), color='red')
# 显示下载进度情况
def show_progress_bar(str_content, time_sleep=0.5):
if str_content:
sys.stdout.write('\r')
sys.stdout.write(str(str_content))
sys.stdout.flush()
if time_sleep:
time.sleep(time_sleep)
# 用于测试展示
def test(content):
# 使用globals()函数获取全局变量字典
# variables = globals()
# 使用locals()函数获取局部变量字典
variables = locals()
param_name = str()
# print(666666, content)
# 遍历变量字典并打印变量名和值
for var_name, var_value in variables.items():
# print(88888888, var_name, var_value)
if content == var_value:
param_name = var_name
print("------------------------ 分隔符开始 ------------------------")
print("【变量】:{}".format(param_name))
print("【内容】:{}".format(content))
print("【数据类型】:{}".format(type(content)))
print("------------------------ over ------------------------------")
# 程序计时
"""
timer = timekeeper()
timer.start()
# 程序内容
timer.end()
"""
class timekeeper(object):
def __init__(self, dispalay=True):
self.display = dispalay
def set_program_name(self, input_value):
self.program_name = input_value
# 开始计时
def start(self, program_name=''):
if not program_name:
program_name = '程序'
self.start_time = datetime.datetime.now()
self.program_name = program_name
# 计时结束
def end(self):
if not self.start_time:
write_log("未输入开始时间参数", tip='ERR')
self.end_time = datetime.datetime.now()
used_time = (self.end_time - self.start_time).seconds
write_log("%s耗时: %s秒" % (self.program_name, used_time), display=self.display)
# 程序暂停
# 如果存在time2, 随机暂停time1-time2
def sleep(time1, time2=0, display=False):
try:
if display:
write_log("time1: " + str(time1) + ' ' + get_type(time1), display=display)
write_log("time2: " + str(time2) + ' ' + get_type(time2), display=display)
time1 = int(time1)
time2 = int(time2)
if time2:
sleep_time = random.randint(time1, time2+1)
else:
sleep_time = time1
if display:
write_log('暂停【%s秒钟】,请稍等…' % sleep_time)
except Exception:
# 显示错误信息,不退出
display_err_info()
sleep_time = 3
time.sleep(sleep_time)
return None
# # ################# mysql数据库相关操作 START ##################
# 连接数据库
def getdbconnect(dbprop: dict):
try:
global database_connect
database_connect = pymysql.connect(
host=dbprop.get("host"),
user=dbprop.get("user"),
password=dbprop.get("password"),
database=dbprop.get("database"),
charset=dbprop.get("charset"),
autocommit=bool(dbprop.get("autocommit", True)),
port=int(dbprop.get("port")),
client_flag=CLIENT.MULTI_STATEMENTS # 值为:65536
)
except Exception as err:
write_log(err, tip='ERROR')
return database_connect
# 获取配置信息
def config_file(dbFile):
try:
# 当前文件的配置路径
current_catalog = os.path.split(os.path.realpath(__file__))[0]
# print("当前目录:", current_catalog)
# parent_catalog = os.path.dirname(current_catalog)
config_catalog = current_catalog + os.path.sep + 'config' + os.path.sep + dbFile
# print(config_catalog)
prop = Properties(config_catalog)
dbconfig = prop.getpropeties()
return dbconfig
except Exception as err:
write_log(err, tip='ERROR')
"""
#### 多次连接使用下面的方式 ####
# 获得配置信息
db = None
cur = None
try:
dbconfig = public.config_file(dbFile='227_oms_center.txt')
db = public.getdbconnect(dbconfig)
cur = db.cursor()
except Exception as err:
public.write_log(err, tip='ERROR')
----- 需要处理的内容 -----
cur.execute(sql内容)
# 关闭数据库连接、游标
cur.close()
db.close()
"""
# 获得告警模板
def get_warn_template(template_name):
sql = """
select template_content from etl_dingding_warn_template
where template_name = '%s'
""" % template_name
# 20210901: 数据库配置文件后,没有加.txt,会自动添加
db = None
dbFile = 'etl_config_95.txt'
dbconfig = config_file(dbFile)
try:
db = getdbconnect(dbconfig)
except Exception as err:
# db.cursor()
write_log(err, tip='ERROR')
# 获得游标
cur = db.cursor()
cur.execute(sql)
sql_result = cur.fetchall()
cur.close()
db.close()
return sql_result[0][0]
# 剔除字符串中的所有不可见字符
def remove_invisible_characters(text):
pattern = r'\s+' # 匹配任意不可见字符的正则表达式模式
cleaned_text = re.sub(pattern, '', text)
return cleaned_text
# 执行sql语句
def run_sql(sql_txt,dbFile='localhost_hrjx.txt',mul_sql=False,needResult=False,executemany=False,parameter_list=None,group_concat_max_len=0,display=False):
"""
参数说明:
sql_txt:sql语句,支持多个sql
dbFile:数据库连接的文件名,默认放在当前脚本的config目录里面
mul_sql:多个sql语句一起执行,不用分号隔开执行
needResult: 是否需要结果
+ True:需要
+ False:不需要
+ 'oneValue': 返回一个值,比如【select count(1) cnt from xxxx】【select sum(amt) amt from xxxx】等输入
executemany: insert语句,一次插入多行数据,必须要是元组,元组每一项必须是元组或列表
parameter_list: executemany为True时,insert的元组数据
group_concat_max_len:group_concat函数长度最大限制
+ 假:默认
+ 真:设置为group_concat_max_len的长度,建议设为102400
display:后台内容输出
"""
sql_result = None
db = None
# 20210901: 数据库配置文件后,没有加.txt,会自动添加
dbFile = dbFile.strip()
if not dbFile.lower().endswith('.txt'):
dbFile = dbFile + '.txt'
dbconfig = config_file(dbFile)
try:
db = getdbconnect(dbconfig)
except Exception as err:
# db.cursor()
write_log(err, tip='ERROR')
if display:
write_log("connect to db successfully!")
write_log("执行sql 【{0}】".format(sql_txt))
# 获得游标
cur = db.cursor()
try:
# group_concat函数长度最大限制
if group_concat_max_len:
cur.execute("set session group_concat_max_len={};".format(group_concat_max_len))
# 多个sql语句一起执行,不用分号隔开执行
if mul_sql:
cur.execute(sql_txt)
if display:
write_log("该SQL执行完成!")
# 有无sql执行的参数
elif parameter_list:
if executemany:
# 插入多行记录,传入的是二维元组
# args: 模板字符串的参数,是一个列表,列表中的每一个元素必须是元组!!!
# print('插入多行记录')
# 例如:(('测试的告警内容-001', '0', '58', '2'),('测试的告警内容-002', '0', '58', '2'),('测试的告警内容-003', 0, 58, 2))
cur.executemany(sql_txt, parameter_list)
if display:
write_log("SQL执行完成!共插入{}行数据!".format(len(parameter_list)))
else:
# 只插入一条记录,传入的是一维元组
# 例如:('测试的告警内容-001', '0', '58', '2')
cur.execute(sql_txt, parameter_list)
if display:
write_log("SQL执行完成!已插入1条记录!")
else:
# 通过;号分割sql,多个sql会报错
sql_list = sql_txt.split(';')
count = 0
for sql_singe in sql_list:
sql_singe_lower = sql_singe.lower()
# # 存储过程不执行
# if sql_singe_lower.startswith('drop procedure') or sql_singe_lower.startswith('create procedure'):
# write_log("存储过程不执行!", tip='WARN')
# break
# 只执行特定的语句
if sql_singe_lower.find('insert') > -1 or sql_singe_lower.find('select') > -1 or sql_singe_lower.find('update') > -1 \
or sql_singe_lower.find('delete') > -1 or sql_singe_lower.find('truncate') > -1 or sql_singe_lower.find('create') > -1 \
or sql_singe_lower.find('drop') > -1 or sql_singe_lower.find('call') > -1:
cur.execute(sql_singe)
count += 1
elif remove_invisible_characters(sql_singe):
write_log("sql:【{}】未执行".format(sql_singe), tip='WARN')
else:
pass
if display:
write_log("已执行完成【{0}】个SQL语句".format(str(count)))
if count == 0:
write_log("未找到有效的可执行SQL内容,请检查!", tip='WARN')
except Exception as err:
cur.close()
db.close()
write_log("sql执行失败:【{0}】, sql_txt:【{1}】".format(str(err), sql_txt) )
# 获得模板
# message = get_warn_template('python_error').format(str(err), sql_txt)
# dingding.dingtalk_warn_text(message=message, robot_id=58, warn_object=None, at_all=False)
sys.exit(8)
if needResult:
sql_result = cur.fetchall()
if needResult == 'oneValue':
sql_result = sql_result[0][0]
cur.close()
# 目前用完连接就断掉
db.close()
if needResult:
return sql_result
class SQL:
# 结果
result = None
# 初始化函数
def __init__(self, sql):
self.sql = sql
# 重构sql的内容
def remove_comment(self):
# 将符号改为英文字母
# 【/ 斜杠 Slash】:S,【* 星号 Asterisk】:A,【- 横杠 Horizontal bar】: H,【# 井号 Well number】:W
# 注释
comment = {
'W': [0] # 表示【#】
,'HH': [0, 0] # 表示【--】
,'SAAS': [0, 0, 0, 0] # 表示【/**/】,0表示未找到,1表示已匹配
}
sql_txt = self.sql
no_cmt_sql = list()
h_pre_del = False
delete_ending = False
# left slash's delete flag
ls_pre_del = False
# start to delete content of slash
delete_slash = False
# delete end flag FOR /**/
delete_pre_end_flag = False
delete_end_flag = False
# 判断/**/中,后面的*/是否存在,如果不存在该参数为True,会报出异常
# SAAS_flag = 1
is_error = False
# 逐字遍历sql的内容
for a_char in sql_txt:
# 针对#的备注
if a_char == '#':
delete_ending = True
# 针对--的备注
elif a_char == '-':
# 前面已经是H【-】了
if h_pre_del:
# 删掉从这到/n的数据(前面的-也需要删掉)
delete_ending = True
if no_cmt_sql:
no_cmt_sql.pop()
else:
# 首次出现
h_pre_del = True
elif a_char == '/':
ls_pre_del = True
elif a_char == '*':
if ls_pre_del:
# print(111111, a_char)
delete_slash = True
is_error = True
else:
ls_pre_del = False
############## 删除注释内容的地方 #############
# 删除备注信息/**/
if delete_slash:
# 删掉从这到/*的数据(前面的/*也需要删掉)
# print('需要删除: ', a_char)
if no_cmt_sql:
no_cmt_sql.pop()
no_cmt_sql.append('\n')
# 没有找到【*/】
if not delete_end_flag:
if a_char == '*':
# print(444, a_char)
delete_pre_end_flag = True
elif a_char == '/':
if delete_pre_end_flag:
# print(2222, a_char)
# 已经找到了
delete_end_flag = True
is_error = False
else:
delete_pre_end_flag = False
else:
# print("已删除完毕")
delete_slash = False
delete_end_flag = False
# 不是在/**/模式下
else:
if a_char == '\n':
# 【-- 和 #】需要重置参数
no_cmt_sql.append(a_char)
delete_ending = False
h_pre_del = False
else:
# 说明这部分是不删除的内容
if not delete_ending:
no_cmt_sql.append(a_char)
if is_error:
write_log("/**/格式的备注信息中,后面的【*/】找不到,请检查!", tip='ERROR')
sql_result = ''.join(no_cmt_sql)
# 删除多个换行符
tmp_result = re.sub('(\r\n)+', '\r\n', sql_result)
tmp_result= re.sub('(\n)+', '\n', tmp_result)
# 删除空白行
self.result = re.sub('$\s', '发现', tmp_result)
# 获得结果
def get_result(self, display=False):
if display:
write_log("格式化后的SQL:【{}】".format(self.result))
return self.result
# # ################# mysql数据库相关操作 END ##################
# ################# 测试专用 START ##################
# 好玩的分割线
def separator(tip=None):
if not tip:
tip = "I'm a Dividing Line"
tip = "-" * 20 + tip + "-" * 20
print(tip)
# ################# 测试专用 END ##################
# 初始化目录信息
def initialize(display=False):
if True:
print('开始初始化基本设置...')
print('程序目录:', prog_dir)
# print('工作路径:', work_dir)
print('日志目录:', log_dir)
print('日志文件名:', log_file)
# 初始化目录
# if not os.path.exists(work_dir):
# print('< %s > is not exist, has created it.' % work_dir)
os.mkdir(work_dir)
if not os.path.exists(log_dir):
print('< %s > is not exist, has created it.' % log_dir)
os.mkdir(log_dir)
if __name__ == '__main__':
# tupple_list = [['1', '小明', '1', ''],['2', '小红', '1', '']]
# user_name = run_sql("insert into sync.sync_table_cfg", type=2,tupple_list = tupple_list, display=True)
# print("公共方法")
# sql = """
# select etl_id from Test.etl_mapping_cfg
# """
#
# result = run_sql(sql_txt=sql, dbFile='etl_config.txt', needResult=True, display=True)
# result_list = [str(i[0]) for i in result]
# etl_id_str = ','.join(result_list)
# print("结果:", etl_id_str)
# insert_data = ((chk_id, chk_rule_name, chk_sql, chk_result, chk_result_code),)
# insert_record_sql = """
# insert into check_result2(chk_id, chk_rule_name, chk_sql, chk_result, chk_result_code)
# values(%s, %s, %s, %s, %s)
print("当前文件目录:", prog_dir)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zcl19901222/powersigner_import.git
git@gitee.com:zcl19901222/powersigner_import.git
zcl19901222
powersigner_import
powersigner_import
master

搜索帮助