1 Star 0 Fork 15

tinytinyfool/脚本刷分

forked from yayuqi/脚本刷分 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
全民消防学习平台.py 23.78 KB
一键复制 编辑 原始数据 按行查看 历史
yayuqi 提交于 2023-05-19 17:11 . 第一次提交
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
from tkinter.filedialog import askopenfilename
import tkinter as tk
from tkinter.messagebox import showinfo, showerror
import random
import threading
import requests
requests.packages.urllib3.disable_warnings()
class Qmxfxx:
def __init__(self, auth, logs, shebe):
self.shebe = shebe
self.headers = {
"Host": "qmxfxx.119.gov.cn",
"Connection": "keep-alive",
"Authorization": auth,
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, "
"like Gecko) Mobile/20A392 Ariver/1.1.0 AliApp(AP/10.3.0.6100) Nebula WK RVKType(0) "
"AlipayDefined(nt:WWAN,ws:390|780|3.0) AlipayClient/10.3.0.6100 Language/zh-Hans Region/CN "
"NebulaX/1.0.0 "
}
self.name = self.user()
logs = logs
if not self.name:
# print("token 失效")
logs.insert("end", "token 失效", 'tag_1')
logs.insert("end", "\n")
logs.see("end")
def get_info(self, url):
res = requests.get(url=url, headers=self.headers, verify=False)
if res.status_code != 200:
logs.insert("end", f"-----get请求-----\n{url}\n网络请求失败,错误码:{res.status_code}\n", 'tag_1')
logs.see("end")
return {}
return res.json()
def post_info(self, url, data):
res = requests.post(url=url, headers=self.headers, json=data, verify=False)
if res.status_code != 200:
# print("-----post请求-----")
# print(url, data)
# print(f" 网络请求失败,错误码:{res.status_code}")
logs.insert("end", f"-----post请求-----\n{url, data}\n:网络请求失败,错误码:{res.status_code}\n", 'tag_1')
logs.see("end")
return {}
return res.json()
# ============================== 视频(20) =================================================
def vido(self):
# 多线程获取视频ID
def get_id(number, data_ls):
ls = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/news/content/list?channel=5&offset={number}&limit=10"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
for tdict in res["result"]["videoList"]:
ls.append(tdict["videoId"])
lock.acquire()
data_ls += ls
lock.release()
# 多线程发送请求
def send(d_id):
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLAV&parameter1={d_id}&parameter2=CCTV"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# 创建多线程获取id
lock = threading.RLock()
vido_id = []
for i in range(1, 3):
t = threading.Thread(target=get_id, args=(i, vido_id))
t.start()
t.join()
# 创建多线程发送请求
for i in range(11):
if not vido_id:
logs.insert("end", f"{self.name}id获取失败\n", 'tag_1')
logs.see("end")
# print("id获取失败")
return 0
did = random.choice(vido_id)
t = threading.Thread(target=send, args=(did,))
t.start()
t.join()
# ============================== 文章(5) =================================================
def article(self):
# 获取文章ID
art_id = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/news/content/list?channel=3&offset={1}&limit=10"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
for tdict in res["result"]["articleList"]:
art_id.append(tdict["articleId"])
def send(d_id):
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLREADARTICLE&parameter1={d_id}&parameter2=XFZX"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert('end', f"{self.name, res}\n", 'tag_1')
logs.see("end")
for i in range(6):
did = random.choice(art_id)
t = threading.Thread(target=send, args=(did,))
# t.setDaemon(True)
t.start()
t.join()
# ============================== 音频(60) =================================================
def audio(self):
# 多线程获取视频ID
def get_id(number, data_ls):
ls = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/news/content/list?channel=8&offset={number}&limit=10"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
for tdict in res["result"]["videoList"]:
ls.append(tdict["videoId"])
lock.acquire()
data_ls += ls
lock.release()
# 多线程发送请求
def send(d_id):
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLAVTIME&parameter1={d_id}&parameter2=XFYP"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# 创建多线程获取id
lock = threading.RLock()
aud_id = []
for i in range(1, 4):
t = threading.Thread(target=get_id, args=(i, aud_id))
t.start()
t.join()
# print(vido_id)
# 创建多线程发送请求
for i in range(21):
if not aud_id:
# print("id获取失败")
logs.insert("end", f"{self.name}id获取失败\n", 'tag_1')
logs.see("end")
break
did = random.choice(aud_id)
t = threading.Thread(target=send, args=(did,))
t.start()
t.join()
# ============================== 每日答题(5) =================================================
def everyday(self):
# 获取题号
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/randomExam/0/10048/-1/10048"
if not self.get_info(url):
logs.insert("end", f"{self.name, url}\nurl出现异常\n", 'tag_1')
logs.see("end")
return 0
ls = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/loadExamPaper/10048/10048"
res = self.post_info(url, {})
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
res = res["result"]["questions"]
for tdict in res["single"]:
questionId = tdict["questionId"]
checkOption = tdict["url"][:-1]
ls.append({"questionId": questionId, "status": 0, "checkOption": checkOption})
duoxuan = res["mutil"][0]
questionId = duoxuan["questionId"]
checkOption = duoxuan["url"][0:-1]
ls.append({"questionId": questionId, "status": 0, "checkOption": checkOption})
panduan = res["judge"][0]
questionId = panduan["questionId"]
context = panduan["url"]
ls.append({"questionId": questionId, "status": 0, "context": context})
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/saveOrSubmitPaperQuestion/10048/10048/-1/submit"
if not self.post_info(url, ls):
logs.insert("end", f"{self.name, ls}\npost出现异常\n", 'tag_1')
logs.see("end")
return 0
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLPRACTICEDAY"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# ============================== 挑战答题(5) =================================================
def challenge(self):
ls = []
data = {"userAnswer": ls}
# 获取题目
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/question/challenge"
res = self.post_info(url, data)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
for tdict in res["result"]:
questionId = tdict["questionId"]
for ls_dict in tdict["optionList"]:
if ls_dict["isRight"] == "T":
userAnswer = ls_dict["id"]
ls.append({"questionId": questionId, "userAnswer": userAnswer, "type": 1})
break
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/question/challenge/submit"
res = self.post_info(url, data)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# ============================== 我要考试(5) =================================================
def examination(self):
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/users/checkTask?taskCode=CTWLAVTIME,CTWLAV"
if not self.get_info(url):
logs.insert("end", f"{self.name, url}\n出现异常\n", 'tag_1')
logs.see("end")
return 0
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/examTotalQuestion"
if not self.get_info(url):
logs.insert("end", f"{self.name, url}\nurl出现异常\n", 'tag_1')
logs.see("end")
return 0
ls = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/loadExamPaper/9999/9999"
res = self.post_info(url, {})
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
res = res["result"]["questions"]["single"]
for tdict in res:
questionId = tdict["questionId"]
checkOption = tdict["url"][:-1]
ls.append({"questionId": questionId, "status": 0, "checkOption": checkOption})
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/saveOrSubmitPaperQuestion/9999/9999/-1/submit"
if not self.post_info(url, ls):
logs.insert("end", f"{self.name, ls}\npost出现异常\n", 'tag_1')
logs.see("end")
return 0
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLTEST"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# ============================== 双人对战(4) =================================================
def pk(self):
def send():
ls = []
data = {"userAnswer": ls}
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/question/pk"
res = self.post_info(url, data)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
res = res["result"]["list"]
for tdict in res:
questionId = tdict["questionId"]
for ls_dict in tdict["optionList"]:
if ls_dict["isRight"] == "T":
userAnswer = ls_dict["id"]
ls.append({"questionId": questionId, "userAnswer": userAnswer, "type": 1})
break
data["status"] = 1
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/question/pk/submit"
res = self.post_info(url, data)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(self.name, res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
for _ in range(3):
t = threading.Thread(target=send)
t.start()
t.join()
# ============================== 每周答题(5) =================================================
def weekly(self):
# 获取题号
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/randomExam/0/10049/-1/10049"
if not self.get_info(url):
logs.insert("end", f"{self.name, url}出现异常\n", 'tag_1')
logs.see("end")
return 0
ls = []
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/loadExamPaper/10049/10049"
res = self.post_info(url, {})
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
res = res["result"]["questions"]
for tdict in res["single"]:
questionId = tdict["questionId"]
checkOption = tdict["url"][:-1]
ls.append({"questionId": questionId, "status": 0, "checkOption": checkOption})
duoxuan = res["mutil"][0]
questionId = duoxuan["questionId"]
checkOption = duoxuan["url"][:-1]
ls.append({"questionId": questionId, "status": 0, "checkOption": checkOption})
panduan = res["judge"][0]
questionId = panduan["questionId"]
context = panduan["url"]
ls.append({"questionId": questionId, "status": 0, "context": context})
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/course/exam/saveOrSubmitPaperQuestion/10049/10049/-1/submit"
if not self.post_info(url, ls):
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
url = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/home/taskScord/completeTask?taskCode=CTWLPRACTICEWEEK"
res = self.get_info(url)
if not res:
logs.insert("end", f"{self.name, res}出现异常\n", 'tag_1')
logs.see("end")
return 0
# print(res)
logs.insert("end", f"{self.name, res}\n", 'tag_1')
logs.see("end")
# ============================== 执行所有任务 =================================================
def t_all(self):
self.audio()
self.vido()
self.article()
self.everyday()
self.challenge()
self.examination()
self.pk()
self.weekly()
# ============================== 获取用户 =================================================
def user(self):
us = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/users"
res = self.get_info(us)
if not res:
return False
yn = res.get("msg", "")
if yn != "succ":
return False
name = res["result"]["nickName"]
return name
# ============================== 查看分数 =================================================
def get_fen(self):
integral = f"https://qmxfxx.119.gov.cn/{self.shebe}/mini/api/users/activeScore"
fens = requests.get(url=integral, headers=self.headers, verify=False).json()["result"]
return fens
def create_windows():
"""初始化窗口"""
window = tk.Tk()
window.title("全民消防刷分")
window.geometry("680x450")
# window.iconbitmap("1.ico")
window.resizable(False, False)
# window.resizable(width=True, height=True)
return window
def deploy_check_button(window):
"""部署checkbutton控件"""
# 反选
def unselectall():
for index, item in enumerate(list1):
if v[index].get():
v[index].set("")
else:
v[index].set(item)
# 全选
def selectall():
for index, item in enumerate(list1):
v[index].set(item)
# frame1放置微信和支付宝选项 + 提示
frame1 = tk.Frame(window, pady=10, padx=50)
frame1.grid(row=0, column=0)
# frame2放置任务选项
frame2 = tk.Frame(window, pady=10, padx=50)
frame2.grid(row=0, column=1)
# 放置微信和支付宝
# 微信或支付宝(默认微信)
ts_txt = "请选择需要执行的操作:"
tk.Label(frame1, text=ts_txt, height=3).grid(row=0, column=0)
opt1 = tk.IntVar()
tk.Radiobutton(frame1, width=18, text='微信', variable=opt1, value=0).grid(row=1, column=0, sticky="w")
tk.Radiobutton(frame1, width=18, text='支付宝', variable=opt1, value=1).grid(row=2, column=0, sticky="w")
# 全选反选
opt2 = tk.IntVar()
tk.Radiobutton(frame2, text='全选', variable=opt2, value=1, command=selectall).grid(row=0, column=0, sticky="e" + 'w')
tk.Radiobutton(frame2, text='反选', variable=opt2, value=0, command=unselectall).grid(row=0, column=1,
sticky="e" + 'w')
list1 = ['视频(60分)', '音频(20分)', '文章(5分)', '每日答题(5分)', '挑战答题(5分)', '每周答题(5分)', '双人对战(5分)', '我要考试(5分)']
v = []
# 设置勾选框,每3个换行
for index, item in enumerate(list1):
v.append(tk.StringVar())
tk.Checkbutton(frame2, text=item, variable=v[-1], onvalue=item, offvalue='', ).grid(row=index // 3 + 1,
column=index % 3,
sticky='w')
return v, opt1
def file_button(window):
# 文件选择文本框
frame1 = tk.Frame(window, pady=10, padx=15, bg="#B100FD")
frame1.grid(row=0, column=0, sticky="nsw")
# 文件选择按钮
frame2 = tk.Frame(window, pady=10, padx=15)
frame2.grid(row=0, column=1, sticky="nse")
# 文本和下拉框
text = tk.Text(frame1, width=60, height=3, bg="#4BC383")
text.grid(row=0, column=0, sticky="nsw")
scroll = tk.Scrollbar(frame1)
text['yscrollcommand'] = scroll.set
scroll['command'] = text.yview
scroll.grid(row=0, column=1, sticky="ns")
# 提示信息
text.insert("0.0", "请输入token值或选择带有多个token值的文件")
# 按钮
def fileopen():
file_sql = askopenfilename()
if file_sql:
text.delete("0.0", "end")
text.insert("0.0", file_sql)
# print(text.get("0.0", "end"))
tk.Button(frame2, width=10, text='选择文件', font=("宋体", 14), command=fileopen).grid(row=0, column=0, ipady=5)
return text
def run_button(window):
def run(file, data, sbt):
logs.delete("0.0", "end")
ht = ("wechat", "alipay")[sbt.get()]
def qmxfrw(auth, ls):
button1['state'] = tk.DISABLED
qmxf = Qmxfxx(auth, logs, ht)
print(qmxf.name)
if not qmxf.name:
# showinfo('提示', 'token 失效')
button1['state'] = tk.ACTIVE
return 0
start = qmxf.get_fen()
logs.insert("end", f"{qmxf.name}开始任务,起始分数:{start}积分\n")
logs.see("end")
tdict = {
0: "qmxf.audio()",
1: "qmxf.vido()",
2: "qmxf.article()",
3: "qmxf.everyday()",
4: "qmxf.challenge()",
5: "qmxf.weekly()",
6: "qmxf.pk()",
7: "qmxf.examination()",
}
for index in ls:
exec(compile(tdict.get(index), '<string>', 'exec'))
if not rw_ls:
qmxf.t_all()
# showinfo("任务完成", f"{qmxf.name}获取了:{qmxf.get_fen() - start}积分")
logs.insert("end", f"任务完成:{qmxf.name}获取了:{qmxf.get_fen() - start}积分\n")
logs.see("end")
button1['state'] = tk.ACTIVE
rw_ls = []
for index in range(len(data)):
if data[index].get():
rw_ls.append(index)
auth_ls = []
file_auth = file.get("0.0", "end").strip()
if len(file_auth) > 280:
auth_ls.append(file_auth)
else:
try:
file_auth = open(file_auth, encoding="utf8")
for i in file_auth.readlines():
auth = i.strip()
if len(auth) > 280:
auth_ls.append(auth)
file_auth.close()
except:
showerror('错误', '请输入正确的文件')
return 0
# 多线程刷分
if not auth_ls:
showinfo("提示", "token值为空请确保token值的正确")
return 0
# button1['state'] = tk.DISABLED
for token in auth_ls:
t = threading.Thread(target=qmxfrw, args=(token, rw_ls))
t.setDaemon(True)
t.start()
# button1['state'] = tk.ACTIVE
return 1
def stop():
exit(0)
# 日志
frame1 = tk.Frame(window)
frame1.grid(row=0, column=0)
tk.Label(frame1, text="日志:").grid(row=0, column=0, sticky="nse")
tk.Label(frame1, width=45).grid(row=0, column=1, sticky="nse")
# 按钮
frame2 = tk.Frame(window)
frame2.grid(row=0, column=1)
# 开始按钮
button1 = tk.Button(frame2, text="开始", font=('宋体', '15', 'bold'), state=tk.ACTIVE,
command=lambda: run(file, data, sbt), width=13)
button1.grid(row=0, column=0, padx=10, sticky="n" + "s")
# 结束按钮
button2 = tk.Button(frame2, text="结束", font=('宋体', '15', 'bold'), command=stop)
button2.grid(row=0, column=1, padx=10, sticky="n" + "s")
def logs_button(window):
frame1 = tk.Frame(window)
frame1.grid(row=0, column=0)
logs = tk.Text(frame1, width=80, height=12, bg="#FFB6C1", fg="#00008B")
logs.grid(row=0, column=0, padx=10, pady=10, sticky="S" + "W" + "E" + "N")
scroll = tk.Scrollbar(frame1)
logs['yscrollcommand'] = scroll.set
scroll['command'] = logs.yview
scroll.grid(row=0, column=1, rowspan=1, sticky="S" + "W" + "E" + "N")
return logs
if __name__ == '__main__':
# 初始化窗口
window = create_windows()
# 创建大盒子
frame1 = tk.Frame(window)
frame1.grid(row=0, column=0)
frame2 = tk.Frame(window)
frame2.grid(row=1, column=0)
frame3 = tk.Frame(window)
frame3.grid(row=2, column=0)
frame4 = tk.Frame(window)
frame4.grid(row=3, column=0)
# 选择的值
data, sbt = deploy_check_button(frame1)
# 文件选择
file = file_button(frame2)
# 运行按钮
run_button(frame3)
# 日志框
logs = logs_button(frame4)
# 进入主循环
window.mainloop()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/aaminuoshou/script-brushing.git
git@gitee.com:aaminuoshou/script-brushing.git
aaminuoshou
script-brushing
脚本刷分
master

搜索帮助