代码拉取完成,页面将自动刷新
同步操作将从 Elycio/autoreceive 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import requests
import re
import time
from concurrent.futures import ThreadPoolExecutor
import threading
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "TLS13-CHACHA20-POLY1305-SHA256:TLS13-AES-128-GCM-SHA256:TLS13-AES-256-GCM-SHA384:ECDHE:!COMPLEMENTOFDEFAULT"
# 这里是注释,不用做修改
# 把这下面变量cookie和csrf换成你的cookie和csrf,仅替换这里,其他的不需要替换
#
# 注意注意,现在单引号也不太行了,换成""" """两对三个单引号,不要留有空格、换行符、制表符等空白字符,由于叔叔往cookie里面塞了不正经的东西,使用单、双引号会导致程序无法正常读取内容
cookie = """cookie"""
csrf = 'csrf'
lock = threading.Lock()
pool = ThreadPoolExecutor(max_workers=6)
post_header = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36",
"content-length":"130",
"content-type":"application/x-www-form-urlencoded",
"dnt": "1",
"origin":"https://www.bilibili.com",
"referer":"https://www.bilibili.com/",
"sec-ch-ua":"\"Chromium\";v=\"94\", \"Google Chrome\";v=\"94\", \";Not A Brand\";v=\"99\"",
"sec-fetch-mode":"cors",
"sec-ch-ua-mobile":"?0",
"sec-fetch-site":"same-site",
"sec-fetch-dest": "empty",
"sec-ch-ua-platform": "\"Windows\"",
"cookie": cookie
}
#
get_header = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36",
"cookie": cookie
}
# 复制每日任务活动的task_id的值到这里
daily_task = {
# 直播任务1
"73e231bf": False,
"02695b61": False,
"7bd043f8": False,
"bc1db428": False,
"e86bd228": False,
"75d09981": False
}
# 复制里程碑活动的task_id的值到这里,一定要确定好顺序,活动界面从上到下,从左到右开始计数
milepost_task = {
# 3天
"531d8377": False,
# 5天
"978eaa3e": False,
# 10天
"f3c0edc2": False,
# 20天
"9585ed03": False,
# 30天
"c2e37b4b": False,
# 40天
"eafe63e9": False,
# 新增舰长
"2221eb5d": False
}
# 复制里程碑活动的task_id的值到这里,一定要确定好顺序,活动界面从上到下,从左到右开始计数
milepost_task_multi = [
"531d8377",
"978eaa3e",
"f3c0edc2",
"9585ed03",
"c2e37b4b",
"eafe63e9",
"2221eb5d"
]
reward_url = "https://api.bilibili.com/x/activity/mission/single_task"
receive_url = "https://api.bilibili.com/x/activity/mission/task/reward/receive"
# 领取兑换码
def get_reward(url, body):
response = requests.post(url, data=body, headers=post_header)
response.encoding = 'utf-8'
result = response.text
cd_key_match = re.match(r'.*"cdkey_content":"(.*?)",', result)
if cd_key_match:
# cd_key_list.append(cd_key_match.group(1)+'\n')
return True
if result.__contains__("任务奖励已领取") or result.__contains__("来晚了"):
print("领取结果为: " + response.text)
return True
print("领取失败,服务器返回结果:"+ response.text)
return False
# 判断是否完成相应任务
def get_reward_item_id(url:str, body):
response = requests.get(url, headers=get_header)
response.encoding = 'utf-8'
res = response.text
macthObj = re.match(r'.*task_id":(.*?),"act_id":(.*?),"group_id":(.*?),.*receive_id":(.*?),', res)
target = "0"
if macthObj:
body["task_id"] = macthObj.group(1)
body["act_id"] = macthObj.group(2)
body["group_id"] = macthObj.group(3)
body["receive_id"] = macthObj.group(4)
target = macthObj.group(4)
else:
print("请确认你的网址等是否正确")
if target.__eq__("0"):
if int(time.time()) % 10 == 0:
print("任务尚未完成或者奖池已空(10s显示一次):任务ID--"+url[-11:])
return target
# 日常版本
def milepost_task_handler():
body = {
"csrf": csrf,
"act_id": "74",
"task_id": "159",
"group_id": "1310",
"receive_id": "255231",
"receive_from": "missionLandingPage"
}
# 判断里程碑任务完成情况
for i in milepost_task.keys():
if milepost_task[i]:
continue
url = reward_url + "?csrf=" + body['csrf'] + "&id=" + i
receive_id = get_reward_item_id(url, body)
if receive_id == "0":
milepost_task[i] = False
continue
milepost_task[i] = get_reward(receive_url, body)
def daily_task_handler():
# 进行每日任务
b_flag = 0
body = {
"csrf": csrf,
"act_id": "74",
"task_id": "159",
"group_id": "1310",
"receive_id": "255231",
"receive_from": "missionLandingPage"
}
for i in daily_task.keys():
if daily_task[i]:
continue
url = reward_url + "?csrf=" + body['csrf'] + "&id=" + i
receive_id = get_reward_item_id(url, body)
if receive_id == "0":
daily_task[i] = False
continue
daily_task[i] = get_reward(receive_url, body)
b_flag = b_flag+1
if b_flag >= 6:
print("你已完成今日所有直播任务,开始尝试领取里程碑奖励")
milepost_task_handler()
print("本轮查询结束\n")
# 多线程端使用
def handler_task(index):
body = {
"csrf": csrf,
"act_id": "74",
"task_id": "159",
"group_id": "1310",
"receive_id": "255231",
"receive_from": "missionLandingPage"
}
url = reward_url + "?csrf=" + body['csrf'] + "&id=" + milepost_task_multi[index]
lock.acquire()
receive_id = get_reward_item_id(url, body)
if receive_id != "0":
print("本次要领取的物品编号为:" + milepost_task_multi[index])
get_reward(receive_url, body)
lock.release()
# 批量领取兑换码,多线程版
def reward_multi_threads(collection):
for i in collection:
index = int(i) - 1
pool.submit(handler_task, index)
# 循环领取奖励,多线程版
def cycle_reward_multi_threads():
print("请输入要领取的里程碑奖励次序(活动界面从上到下,从左到右,从1开始)")
print("多个任务以英文状态下的 , 隔开,例如 1,2,3")
ids = input("请输入要领取的里程碑奖励次序(活动界面从上到下,从左到右,从1开始)\n")
items = ids.split(",")
a_flag = 0
while True:
a_flag = a_flag+1
# 开启多线程处理所有任务
reward_multi_threads(items)
if a_flag > 50:
a_flag = 0
time.sleep(0.05)
if __name__ == '__main__':
print("领取的奖励请自行去活动页面查看,只抢里程碑的方式为多线程方式,容易造成内存溢出,请不要长时间运行")
print("1.领取日常任务,完成后领取里程碑奖励\n2.指定领取某些里程碑奖励,建议在每天即将刷新任务时开启程序")
print('请检查cookie和csrf是否正确\ncookie=%s\ncsrf=%s\n' % (cookie, csrf))
flag = input("请输入1或2\n")
if flag == "1":
while True:
daily_task_handler()
time.sleep(0.1)
elif flag == "2":
cycle_reward_multi_threads()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。