1 Star 0 Fork 1

zcy0776/GOOD_ROBOT_LZG

forked from lzg114514/GOOD_ROBOT_LZG 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main.py 11.16 KB
一键复制 编辑 原始数据 按行查看 历史
lzg114514 提交于 2023-11-09 14:55 . 添加查看服务器信息的API
from flask import *
import hmac
import base64
import traceback
import func.systemInfo
from func import *
os.system("title GOOD ROBOT LZG")
# 消息数字签名计算核对
def check_sig(timestamp):
app_secret = 'NLXegfN6gDZNZIppuOWMlUnY_ibEPGZ_2pd87xtz9iBgGjja4lvu7XJmrJldY1i4'
app_secret_enc = app_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, app_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(app_secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
# 处理自动回复消息
c_m = command_and_md(req_data=None, NONE_REQ_DATA=True)
with open("templates/error_report_template.html", 'r') as f:
error_report_template = f.read()
def handle_info(req_data):
global c_m, error_report_template
text_info = req_data['text']['content'].strip()
webhook_url = req_data['sessionWebhook']
sendernick = req_data['senderNick']
Banned_list = []
with open("data/Banned.json", 'r') as f:
Banned_list = json.load(f)
try:
with open("data/all_cmds.json", 'r') as f:
c = json.load(f)
if sendernick in Banned_list:
send_text_msg("抱歉,您已被封禁", webhook_url)
elif text_info == 'DEBUG__':
text = "DEBUG__TEXT"
send_text_msg(text, webhook_url)
elif text_info in c:
send_text_msg(c[text_info], webhook_url)
elif os.path.exists(f"./markdown/{text_info}.md"):
mdMsg = open(f"./markdown/{text_info}.md", 'r').read()
send_md_msg("mdMsg", mdMsg, webhook_url)
elif text_info.lower() == "help":
c_m.changeReqData(req_data)
c_m.help()
elif text_info[:4].lower() == "cmd$" or text_info[:8].lower() == "command$":
temp_text_info = text_info
text_info = text_info.replace("\$", "__GOOD~ROBOT-LZG=DOLLAR+CMD>SPLIT?SIGN__")
cmd = text_info.split("$")
for i in range(len(cmd)):
cmd[i] = cmd[i].replace("__GOOD~ROBOT-LZG=DOLLAR+CMD>SPLIT?SIGN__", "$")
text_info = temp_text_info
if (cmd[1].lower() == "help") and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.help()
elif (cmd[1] == "division_by_zero") and len(cmd)==2:
c_m.changeReqData(req_data)
c_m.division_by_zero()
elif (cmd[1].lower() == "unknown_request_warning") and len(cmd)==3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.unknown_request_warning()
elif (cmd[1] == "ar" or cmd[1] == "addref") and len(cmd) == 4 and (cmd[2]!="" and cmd[3]!=""):
c_m.changeReqData(req_data)
c_m.addref()
elif (cmd[1] == "delref" or cmd[1] == "dr") and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.delref()
elif cmd[1] == "raiseerror" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.raiseerror()
elif cmd[1] == "ref" and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.ref()
elif cmd[1] == "runservercommand" and len(cmd) == 4 and (cmd[2]!="" and cmd[3]!=""):
c_m.changeReqData(req_data)
c_m.runservercommand()
elif cmd[1] == "systeminfo" and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.SystemInfo()
elif cmd[1] == "minisysteminfo" and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.miniSystemInfo()
elif cmd[1] == "ban" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.ban()
elif cmd[1] == "pardon" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.pardon()
elif (cmd[1] == "banned_list" or cmd[1] == "bl") and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.banned_list()
elif cmd[1].lower() == "bomb" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.bomb()
elif cmd[1].lower() == "guijiao" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.gui_jiao()
elif (cmd[1].lower() == "growgrass" or cmd[1].lower() == "gg") and len(cmd) == 4 and (cmd[2]!="" and cmd[3]!=""):
c_m.changeReqData(req_data)
c_m.grow_grass()
elif (cmd[1].lower() == "get_f_from_m") and len(cmd) == 5 and (cmd[2]!="" and cmd[3]!="" and cmd[4]!=""):
c_m.changeReqData(req_data)
c_m.getFileFromMediaId()
elif cmd[1] == "banip" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.ban_ip()
elif cmd[1] == "pardonip" and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.pardon_ip()
elif (cmd[1] == "banned_ip_list" or cmd[1] == "bil") and len(cmd) == 2:
c_m.changeReqData(req_data)
c_m.banned_ip_list()
elif (cmd[1].lower() == "saytext") and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.sayText()
elif (cmd[1].lower() == "saymd") and len(cmd) == 3 and (cmd[2]!=""):
c_m.changeReqData(req_data)
c_m.sayMd()
elif cmd[1] == "md":
if (cmd[2] == "addref" or cmd[2] == "ar") and len(cmd) == 5 and (cmd[3]!="" and cmd[4]!=""):
c_m.changeReqData(req_data)
c_m.add()
elif (cmd[2] == "delref" or cmd[2] == "dr") and len(cmd) == 4 and (cmd[3]!=""):
c_m.changeReqData(req_data)
c_m.del_()
elif cmd[2] == "delall" and len(cmd) == 3:
c_m.changeReqData(req_data)
c_m.delall()
elif cmd[2] == "ref" and len(cmd) == 3:
c_m.changeReqData(req_data)
c_m.mdref()
else:
send_md_msg("未知命令", "# 未知命令\n### " + text_info, webhook_url)
c_m.changeReqData(req_data)
c_m.help()
else:
send_md_msg("未知命令", "# 未知命令\n### "+text_info, webhook_url)
c_m.changeReqData(req_data)
c_m.help()
else:
send_text_msg("byd故意找茬是吧(", webhook_url)
except Exception as e:
f = open("data/error.json", 'r+')
error_dict = json.load(f)
error_id = str(int(time.time())) + random_string(2)
error_time = time.ctime()
error_type = e.__class__.__name__
error_basicInfo = str(e)[:45] + "..." if len(str(e)) > 45 else str(e)
traceback_info = traceback.format_exc()
error_dict[error_id] = [
error_time, error_type, error_basicInfo, traceback_info
]
f.truncate(0)
f.seek(0)
json.dump(error_dict, f)
f.close()
url_detail = urllib.parse.quote_plus("http://180.166.0.98:1458/error_report/error?id="+error_id)
url_detail_dingtalk = f"dingtalk://dingtalkclient/page/link?url={url_detail}&pc_slide=true"
url_detail_web = f"dingtalk://dingtalkclient/page/link?url={url_detail}&pc_slide=false"
url_history = f"dingtalk://dingtalkclient/page/link?url={urllib.parse.quote_plus('http://180.166.0.98:1458/error_report/error_history')}&pc_slide=false"
send_actionCard_msg("Error Report", e.__class__.__name__,
[getActionCardBtnData("钉钉内查看详情", url_detail_dingtalk),
getActionCardBtnData("浏览器查看详情", url_detail_web),
getActionCardBtnData("查看历史错误", url_history)], webhook_url)
app = Flask(__name__)
@app.before_request
def get_ip():
with open("data/Banned_ips.json", 'r') as f:
if request.remote_addr in json.load(f):
return jsonify({"error":"You have been blocked."}), 403
if unknown_request_warning_state:
ip = request.remote_addr
ip_data = json.loads(requests.get(f"http://ip-api.com/json/{ip}?lang=zh-CN").text)
if not (ip_data["city"]=="杭州" and (ip_data["isp"]=="Hangzhou Alibaba Advertising Co" or ip_data["isp"]=="Hangzhou Alibaba Advertising Co., Ltd.")):
data = f"IP: {ip}\nMethod: {request.method}\n{ip_data['country']}({ip_data['countryCode']})-{ip_data['regionName']}-{ip_data['city']}\n{ip_data['isp']}"
send_text_msg(data,
"https://oapi.dingtalk.com/robot/send?access_token=da2bd9f80755e951ef8766bca5f23aebdf23e2e52336b55c41de4c3b98deb203")
@app.route("/error_report/error_history", methods=["GET"])
def error_history_page():
return render_template("error_history.html")
@app.route("/error_report/data", methods=["GET","POST"])
def get_error_json():
try:
with open("data/error.json", 'r') as f:
error_dict = json.load(f)
return jsonify(error_dict)
except:
return jsonify({"error":f"Internal Server Error\n{traceback.format_exc()}"})
@app.route("/error_report/error", methods=["GET"])
def error_info_page():
id = request.args.get("id")
with open("data/error.json", 'r') as f:
error_dict = json.load(f)
try:
error = error_dict[id]
except KeyError:
return jsonify({"error":f"找不到错误编号为{id}的错误"}) , 404
return error_report_template.replace("__ERROR_TYPE__", error[1]).replace("__ERROR__", error[2]).replace("__TRACE_BACK__", error[3].replace("\n", "<br>"))
@app.route("/guijiao", methods=["GET"])
def guijiao_audio_download():
id = request.args.get("id")
try:
if id+".wav" not in os.listdir("func\\HZYSS\\tempAudioOutput"):
return jsonify({"error":f"找不到id为{id}的音频"}), 404
return send_file(os.path.realpath(f"func\\HZYSS\\tempAudioOutput\\{id}.wav").replace("\\","/"))
except:
return jsonify({"error": traceback.format_exc()})
@app.route("/systeminfo", methods=["GET", "POST"])
def web_systeminfo():
return jsonify(func.systemInfo.GetFullSystemData())
@app.route("/", methods=["POST"])
def get_data():
# 第一步验证:是否是post请求
if request.method == "POST":
# print(request.headers)
# 签名验证 获取headers中的Timestamp和Sign
timestamp = request.headers.get('Timestamp')
sign = request.headers.get('Sign')
# 第二步验证:签名是否有效
if check_sig(timestamp) == sign:
# 获取、处理数据
req_data = json.loads(str(request.data, 'utf-8'))
# 调用数据处理函数
handle_info(req_data)
# print('验证通过')
return 'hhh'
print('验证不通过')
return 'ppp'
if __name__ == '__main__':
try:
app.config['JSON_AS_ASCII'] = False
app.run(host='0.0.0.0', port=1458)
except:
os.system("pause")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/bs0776/GOOD_ROBOT_LZG.git
git@gitee.com:bs0776/GOOD_ROBOT_LZG.git
bs0776
GOOD_ROBOT_LZG
GOOD_ROBOT_LZG
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385