代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
# debugtalk.py
import string
import random
import os
import hashlib
import subprocess
import time
import pymysql
from elasticsearch import Elasticsearch
from chardet.universaldetector import UniversalDetector
import traceback
import urllib
import re
import json
from django.conf import settings
import datetime
import logging
logger = logging.getLogger('DebugTalk')
def getEncodeType(filePath):
bigdata = open(filePath, 'rb')
detector = UniversalDetector()
for line in bigdata.readlines():
detector.feed(line)
if detector.done:
break
detector.close()
bigdata.close()
encode_type = getUnifiedEncodeType(detector.result['encoding'])
logger.info("File encode type of {} : {}".format(filePath, encode_type))
return encode_type
def getUnifiedEncodeType(sType):
sType = sType.lower()
if sType in ['utf-8-sig', 'utf-8'] or 'utf' in sType:
return 'utf-8'
if sType in ['gb2312'] or 'gb' in sType:
return 'gbk'
def execute_sql(database_type, database_host, database_user, database_password, database_name, database_port, *sqlCmd):
"""Because the function is called by Hook, forbid to modify the function name.
Execute SQL cmd.
:param database_type: database type, MySQL
:param database_host: host
:param database_user: database user
:param database_password: database password
:param database_name: database name
:param database_port: database port
:param sqlCmd: SQLs
:return:
"""
if database_type.lower() == "mysql":
executeSql(database_host, database_user, database_password, database_name, database_port, *sqlCmd)
else:
logger.error(" Currently Do not support Database:{} ".format(database_type.lower()))
def executeSql(host, dbUser, dbPw, dbName, dbPort, *sqlCmd):
"""Because the function is called by Hook, forbid modify the function name.
Execute MySQL cmd.
:param host: host
:param dbUser: database user
:param dbPw: database password
:param dbName: database name
:param dbPort: database port
:param sqlCmd:
:return:
"""
logger.info("============Database Connect============")
try:
db = pymysql.connect(host, dbUser, dbPw, dbName, dbPort, autocommit=True)
cursor = db.cursor()
for ss in sqlCmd:
logger.info(ss)
if isinstance(ss, list):
logger.info("Execute SQL file")
_execute_sql_with_file(ss[0], cursor)
else:
logger.info("Execute SQL command")
cursor.execute(ss)
except Exception as e:
print(e)
logger.error("Execute sql failed", exc_info=True)
# traceback.print_exc()
# db.rollback()
raise e
finally:
logger.info("============Database Close!============")
print("db close!")
db.close()
def execute_bat(fStr):
"""Because the function is called by Hook, forbid modify the function name.
Execute BAT.
:param fStr:
:return:
"""
if not isinstance(fStr, list):
return
subprocess.call(fStr[0])
def execute_es(es, *eslines):
"""Because the function is called by Hook, forbid modify the function name.
Execute ES.
:param es:
:param eslines:
:return:
"""
logger.info("execute es")
_es = []
_es.append(es)
es = Elasticsearch(_es)
_params = {"refresh": "true"}
for esline in eslines:
logger.info("1==================")
logger.info(esline)
try:
esline = esline.replace('\xa0', '')
esline = esline.replace(' ', '')
esline = esline.replace('@@', ' ')
esDict = eval(esline)
oType = esDict["type"]
_index = esDict["index"]
_type = esDict["doc_type"]
_doc = esDict["doc"]
logger.info(type(_doc))
logger.info(_doc)
logger.info(oType)
# insert
if oType.lower() == "index":
logger.info("2==================")
_id = esDict.get("id", "")
if _id:
logger.info("no id=============")
ret = es.index(index=_index, doc_type=_type, body=_doc, id=_id, params=_params)
else:
ret = es.index(index=_index, doc_type=_type, body=_doc, params=_params)
logger.info(ret)
# delete
if oType.lower() == "delete":
time.sleep(1.5)
ret= es.delete_by_query(index=_index, body=_doc, params=_params)
logger.info(ret)
# update
if oType.lower() == "update":
ret = es.update_by_query(index=_index, body=_doc, params=_params)
logger.info(ret)
except:
logger.info(traceback.print_exc())
raise
def _execute_sql_with_file(file_path, cur):
"""
Execute SQL File, ignore line start with '--'
:param file_path:
:param cur: cur of db
:return:
"""
eType = getEncodeType(file_path)
with open(os.path.join(settings.BASE_DIR, file_path), 'r+', encoding=eType) as file:
f = ''
for line in file:
if line.startswith('--'):
logger.info(line)
else:
f = ''.join([f, line])
sql_list = f.split(';')[:-1]
sql_list = [x.replace('\n', ' ') if '\n' in x else x for x in sql_list]
for sql_cmd in sql_list:
logger.info(sql_cmd)
cur.execute(sql_cmd)
#db.commit()
def execute_select_sql(host, db_usr, db_pwd, db_name, db_port, select_sql):
"""
sqlCmd - select name from test where ID = 'test'
only one result
:param host: host
:param db_usr: database user
:param db_pwd: database password
:param db_name: database name
:param db_port: database name
:return:
None - No result
Tuple - one result
None - more one resutl
"""
logger.info("============dbConnect==selectSql==========")
try:
logger.info("=try+++selectSql==========")
db = pymysql.connect(host, db_usr, db_pwd, db_name, db_port)
cursor = db.cursor()
logger.info(select_sql)
cursor.execute(select_sql)
db.commit()
data = cursor.fetchall()
logger.info("+++++++++++++++++++++++")
logger.info(data)
if len(data) == 0:
return None
elif len(data) == 1:
return data[0][0]
else:
return None
pirnt("result is too much")
except Exception as e:
print(e)
logger.error("execute sql failed", exc_info=True)
# traceback.print_exc()
db.rollback()
raise e
finally:
logger.info("db close!")
print("db close!")
db.close()
def gen_refid(cnt):
ran_str = ''.join(random.sample(string.ascii_letters + string.digits, cnt))
return ran_str
def gen_ran_str(cnt):
return ''.join(random.sample(
['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f', 'e',
'd', 'c', 'b', 'a'], cnt))
def get_name_from_str(txt):
txt = eval(txt)
ret = []
for dic in txt:
ret.append(dic.get("Name"))
return ret
def readFileFormLocal(file_path):
"""
Because the function is called by upload file in request, forbid modify the function name.
:param file_path:
:return: file data(binary)
"""
try:
data = open(file_path, 'rb')
return data
except Exception as e:
logger.error(e)
def getFileMD5(filename):
flag = False
if isinstance(filename, bytes):
flag = True
tmp_file = "tmp" + gen_ran_str(5)
with open(tmp_file, 'wb') as pf:
pf.write(filename)
filename = tmp_file
elif isinstance(filename, list):
filename = filename[0]
filename = os.path.join(settings.BASE_DIR, filename)
if not os.path.isfile(filename):
return ""
myhash = hashlib.md5()
with open(filename, 'rb') as pf:
while True:
b = pf.read(8096)
if not b:
break
myhash.update(b)
if flag:
os.remove(filename)
return myhash.hexdigest()
def getTimeStamp():
"""
毫秒级时间戳
:return:
"""
return str(int(time.time() * 1000))
def getRandTimeStamp(hour_minute_second, days=0):
"""
获取当前日期,年月日,自定义时分秒计算时间戳
:param hour_minute_second: 00:10:00
:param days: reduce days*86400
:return:
"""
if days == 0:
date_time = datetime.datetime.now().strftime('%Y-%m-%d') + ' ' + hour_minute_second
else:
date_time = (datetime.datetime.now() + datetime.timedelta(days=days)).strftime('%Y-%m-%d') + ' ' + hour_minute_second
logger.info("Convert the source date time:{}".format(date_time))
# 转换成时间数组
timeArray = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
# 转换成时间戳
timestamp = time.mktime(timeArray)
return int(timestamp)
def getSignature(secret, timestamp, nonce):
"""
# noinspection SpellCheckingInspection
:param secret:
:param timestamp:
:param nonce:
:return:
"""
tmpArr = sorted([secret, timestamp, nonce])
content = ''.join(tmpArr)
hsobj = hashlib.sha256()
hsobj.update(content.encode("utf-8"))
return hsobj.hexdigest()
def length(arg):
logger.info('The length of {arg} is:{length}'.format(arg=arg,length=len(arg)))
return len(arg)
def currenttime():
"""
: return time format: 2019-12-31 11:45:30
"""
_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info(_)
return _
def anystring(arg):
return str(arg)
def getTokenFromStr(s):
pa = re.compile(r"&access_token=(.*)&token_type")
match = re.search(pa, s)
if match:
ret = match.group(1)
logger.info(ret)
return "Bearer " + ret
else:
logger.info("no matched str found!")
return ""
def getStateListFromArray(dataArray):
state_list = []
for data in dataArray:
json_str = json.loads(json.dumps(data))
state_list.append(json_str['state'])
return state_list
def getOperatorIdList(data):
res = re.findall(r"'operatorId': '(?P<operatorId>\d+)'", str(data))
logger.info("getOperatorIdList{OperatorIdLis}".format(OperatorIdLis=res))
return res
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。