2 Star 1 Fork 0

勿幕/XApiTest

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
debugtalk.py 10.36 KB
一键复制 编辑 原始数据 按行查看 历史
勿幕 提交于 2023-03-20 14:21 . update:Importor
# -*- coding: utf-8 -*-
# debugtalk.py
import string
import random
import os
import hashlib
import subprocess
import time
import pymysql
from elasticsearch import Elasticsearch
from chardet.universaldetector import UniversalDetector
import traceback
import urllib
import re
import json
from django.conf import settings
import datetime
import logging
logger = logging.getLogger('DebugTalk')
def getEncodeType(filePath):
bigdata = open(filePath, 'rb')
detector = UniversalDetector()
for line in bigdata.readlines():
detector.feed(line)
if detector.done:
break
detector.close()
bigdata.close()
encode_type = getUnifiedEncodeType(detector.result['encoding'])
logger.info("File encode type of {} : {}".format(filePath, encode_type))
return encode_type
def getUnifiedEncodeType(sType):
sType = sType.lower()
if sType in ['utf-8-sig', 'utf-8'] or 'utf' in sType:
return 'utf-8'
if sType in ['gb2312'] or 'gb' in sType:
return 'gbk'
def execute_sql(database_type, database_host, database_user, database_password, database_name, database_port, *sqlCmd):
"""Because the function is called by Hook, forbid to modify the function name.
Execute SQL cmd.
:param database_type: database type, MySQL
:param database_host: host
:param database_user: database user
:param database_password: database password
:param database_name: database name
:param database_port: database port
:param sqlCmd: SQLs
:return:
"""
if database_type.lower() == "mysql":
executeSql(database_host, database_user, database_password, database_name, database_port, *sqlCmd)
else:
logger.error(" Currently Do not support Database:{} ".format(database_type.lower()))
def executeSql(host, dbUser, dbPw, dbName, dbPort, *sqlCmd):
"""Because the function is called by Hook, forbid modify the function name.
Execute MySQL cmd.
:param host: host
:param dbUser: database user
:param dbPw: database password
:param dbName: database name
:param dbPort: database port
:param sqlCmd:
:return:
"""
logger.info("============Database Connect============")
try:
db = pymysql.connect(host, dbUser, dbPw, dbName, dbPort, autocommit=True)
cursor = db.cursor()
for ss in sqlCmd:
logger.info(ss)
if isinstance(ss, list):
logger.info("Execute SQL file")
_execute_sql_with_file(ss[0], cursor)
else:
logger.info("Execute SQL command")
cursor.execute(ss)
except Exception as e:
print(e)
logger.error("Execute sql failed", exc_info=True)
# traceback.print_exc()
# db.rollback()
raise e
finally:
logger.info("============Database Close!============")
print("db close!")
db.close()
def execute_bat(fStr):
"""Because the function is called by Hook, forbid modify the function name.
Execute BAT.
:param fStr:
:return:
"""
if not isinstance(fStr, list):
return
subprocess.call(fStr[0])
def execute_es(es, *eslines):
"""Because the function is called by Hook, forbid modify the function name.
Execute ES.
:param es:
:param eslines:
:return:
"""
logger.info("execute es")
_es = []
_es.append(es)
es = Elasticsearch(_es)
_params = {"refresh": "true"}
for esline in eslines:
logger.info("1==================")
logger.info(esline)
try:
esline = esline.replace('\xa0', '')
esline = esline.replace(' ', '')
esline = esline.replace('@@', ' ')
esDict = eval(esline)
oType = esDict["type"]
_index = esDict["index"]
_type = esDict["doc_type"]
_doc = esDict["doc"]
logger.info(type(_doc))
logger.info(_doc)
logger.info(oType)
# insert
if oType.lower() == "index":
logger.info("2==================")
_id = esDict.get("id", "")
if _id:
logger.info("no id=============")
ret = es.index(index=_index, doc_type=_type, body=_doc, id=_id, params=_params)
else:
ret = es.index(index=_index, doc_type=_type, body=_doc, params=_params)
logger.info(ret)
# delete
if oType.lower() == "delete":
time.sleep(1.5)
ret= es.delete_by_query(index=_index, body=_doc, params=_params)
logger.info(ret)
# update
if oType.lower() == "update":
ret = es.update_by_query(index=_index, body=_doc, params=_params)
logger.info(ret)
except:
logger.info(traceback.print_exc())
raise
def _execute_sql_with_file(file_path, cur):
"""
Execute SQL File, ignore line start with '--'
:param file_path:
:param cur: cur of db
:return:
"""
eType = getEncodeType(file_path)
with open(os.path.join(settings.BASE_DIR, file_path), 'r+', encoding=eType) as file:
f = ''
for line in file:
if line.startswith('--'):
logger.info(line)
else:
f = ''.join([f, line])
sql_list = f.split(';')[:-1]
sql_list = [x.replace('\n', ' ') if '\n' in x else x for x in sql_list]
for sql_cmd in sql_list:
logger.info(sql_cmd)
cur.execute(sql_cmd)
#db.commit()
def execute_select_sql(host, db_usr, db_pwd, db_name, db_port, select_sql):
"""
sqlCmd - select name from test where ID = 'test'
only one result
:param host: host
:param db_usr: database user
:param db_pwd: database password
:param db_name: database name
:param db_port: database name
:return:
None - No result
Tuple - one result
None - more one resutl
"""
logger.info("============dbConnect==selectSql==========")
try:
logger.info("=try+++selectSql==========")
db = pymysql.connect(host, db_usr, db_pwd, db_name, db_port)
cursor = db.cursor()
logger.info(select_sql)
cursor.execute(select_sql)
db.commit()
data = cursor.fetchall()
logger.info("+++++++++++++++++++++++")
logger.info(data)
if len(data) == 0:
return None
elif len(data) == 1:
return data[0][0]
else:
return None
pirnt("result is too much")
except Exception as e:
print(e)
logger.error("execute sql failed", exc_info=True)
# traceback.print_exc()
db.rollback()
raise e
finally:
logger.info("db close!")
print("db close!")
db.close()
def gen_refid(cnt):
ran_str = ''.join(random.sample(string.ascii_letters + string.digits, cnt))
return ran_str
def gen_ran_str(cnt):
return ''.join(random.sample(
['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f', 'e',
'd', 'c', 'b', 'a'], cnt))
def get_name_from_str(txt):
txt = eval(txt)
ret = []
for dic in txt:
ret.append(dic.get("Name"))
return ret
def readFileFormLocal(file_path):
"""
Because the function is called by upload file in request, forbid modify the function name.
:param file_path:
:return: file data(binary)
"""
try:
data = open(file_path, 'rb')
return data
except Exception as e:
logger.error(e)
def getFileMD5(filename):
flag = False
if isinstance(filename, bytes):
flag = True
tmp_file = "tmp" + gen_ran_str(5)
with open(tmp_file, 'wb') as pf:
pf.write(filename)
filename = tmp_file
elif isinstance(filename, list):
filename = filename[0]
filename = os.path.join(settings.BASE_DIR, filename)
if not os.path.isfile(filename):
return ""
myhash = hashlib.md5()
with open(filename, 'rb') as pf:
while True:
b = pf.read(8096)
if not b:
break
myhash.update(b)
if flag:
os.remove(filename)
return myhash.hexdigest()
def getTimeStamp():
"""
毫秒级时间戳
:return:
"""
return str(int(time.time() * 1000))
def getRandTimeStamp(hour_minute_second, days=0):
"""
获取当前日期,年月日,自定义时分秒计算时间戳
:param hour_minute_second: 00:10:00
:param days: reduce days*86400
:return:
"""
if days == 0:
date_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' ' + hour_minute_second
else:
date_time = (datetime.datetime.now() + datetime.timedelta(days=days)).strftime('%Y-%m-%d') + ' ' + hour_minute_second
logger.info("Convert the source date time:{}".format(date_time))
# 转换成时间数组
timeArray = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
# 转换成时间戳
timestamp = time.mktime(timeArray)
return int(timestamp)
def getSignature(secret, timestamp, nonce):
"""
# noinspection SpellCheckingInspection
:param secret:
:param timestamp:
:param nonce:
:return:
"""
tmpArr = sorted([secret, timestamp, nonce])
content = ''.join(tmpArr)
hsobj = hashlib.sha256()
hsobj.update(content.encode("utf-8"))
return hsobj.hexdigest()
def length(arg):
logger.info('The length of {arg} is:{length}'.format(arg=arg,length=len(arg)))
return len(arg)
def currenttime():
"""
: return time format: 2019-12-31 11:45:30
"""
_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info(_)
return _
def anystring(arg):
return str(arg)
def getTokenFromStr(s):
pa = re.compile(r"&access_token=(.*)&token_type")
match = re.search(pa, s)
if match:
ret = match.group(1)
logger.info(ret)
return "Bearer " + ret
else:
logger.info("no matched str found!")
return ""
def getStateListFromArray(dataArray):
state_list = []
for data in dataArray:
json_str = json.loads(json.dumps(data))
state_list.append(json_str['state'])
return state_list
def getOperatorIdList(data):
res = re.findall(r"'operatorId': '(?P<operatorId>\d+)'", str(data))
logger.info("getOperatorIdList{OperatorIdLis}".format(OperatorIdLis=res))
return res
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/flyincyclone/XApiTest.git
git@gitee.com:flyincyclone/XApiTest.git
flyincyclone
XApiTest
XApiTest
master

搜索帮助