1 Star 4 Fork 2

Dout/ql

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
心喜.py 39.58 KB
一键复制 编辑 原始数据 按行查看 历史
Dout 提交于 2024-06-28 14:43 . aa
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
"""
项目名称 心喜 小程序
变量: sso#备注
变量名: XSSONF
多账号 换行/回车
脚本作者: QGh3amllamll
版本 2.01
------更新记录----
1.22版本 更新自动获取抽奖 id 2023年10月31日13点33分
1.23版本 更新抽奖问题
2.00版本 修复任务 2024年1月28日02:34:25
2.01版本 修复获取 data 字段值是 None
"""
import os
import requests
import random
from datetime import datetime, timezone, timedelta
import time
import sys
import io
# 控制是否启用通知的变量
enable_notification = 1
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告:未找到notify.py模块。它不是一个依赖项,请勿错误安装。程序将退出。")
sys.exit(1)
#---------简化的框架--------
# 配置参数
BASE_URL = "https://api.xinc818.com/mini/"
# 获取北京日期的函数
def get_beijing_date():
beijing_time = datetime.now(timezone(timedelta(hours=8)))
return beijing_time.date()
def dq_time():
# 获取当前时间戳
dqsj = int(time.time())
# 将时间戳转换为可读的时间格式
dysj = datetime.fromtimestamp(dqsj).strftime('%Y-%m-%d %H:%M:%S')
return dqsj, dysj
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print(f'----------项目:心喜小程序-----------')
return accounts
# 封装请求头
def create_headers(sso):
headers = {
'Host': 'api.xinc818.com',
'Connection': 'keep-alive',
'sso': sso,
'xweb_xhr': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090819) XWEB/8531',
'Content-Type': 'application/json',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
return headers
def rwlb(sso): # 任务列表
urlrw = BASE_URL + 'dailyTask/daily' # 确保字符串连接正确
headers = create_headers(sso)
try:
response = requests.get(urlrw, headers=headers)
response_data = response.json()
if response_data["code"] != 0 or response_data["data"] is None:
print("错误响应或数据为空,跳过当前账号")
return None
for task in response_data["data"]:
task_id = task.get("id")
task_name = task.get("name")
task_status = task.get("status")
# 跳过不需要处理的任务
if task_name in ["完善个人资料", "购买商城商品", "参加活动", "申请试用", "提交反馈建议"]:
print(f"🚫🚫'{task_name}' ⛔⛔⛔跳过❌❌。")
continue
if task_status: # 如果任务状态为 True (已完成)
print(f"任务 '{task_name}' 已完成。")
#elif task_name == "参与讨论": # 测试bug
# selected_post = fetch_posts_data(sso)
# if selected_post:
# post_id = selected_post[0] # 假设列表的第一个元素是 post_id
# cytl(sso, post_id)
continue
print(f"任务 '{task_name}' 尚未完成。进行处理...")
if task_name == "分享心喜":
fx_xx(sso)
elif task_name == "签到打卡":
sign_dk(sso, task_id)
elif task_name == "想要商品":
xysp(sso)
elif task_name == "大转盘抽奖":
dzp_cj(sso, task_id)
elif task_name == "去商城浏览30秒":
ll_sp(sso, task_id)
elif task_name == "发帖":
hitokoto_content = fetch_hitokoto() # 获取一言内容
if hitokoto_content and not hitokoto_content.startswith("请求一言失败"):
rw_post(sso, hitokoto_content, task_id) # 使用一言内容作为发帖内容
else:
print("未能获取有效的一言内容,无法执行发帖操作。跳过此任务。")
elif task_name == "点赞用户":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 从选中的帖子中获取 post_id
like_post(sso, post_id) # 执行点赞操作
else:
print("未能获取帖子数据,无法执行点赞操作。")
elif task_name == "关注用户":
selected_post = fetch_posts_data(sso)
if selected_post:
follow_user_id = selected_post[1] # 假设使用帖子的发布者ID作为关注对象
gz_user(sso, follow_user_id)
else:
print("未能获取帖子数据,无法执行关注操作。")
elif task_name == "参与讨论":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 假设列表的第一个元素是 post_id
cytl(sso, post_id)
else:
print("未能获取帖子数据,无法参与讨论。")
elif task_name == "给主播留言":
selected_anchor = zb_list(sso)
if selected_anchor:
circle_id, related_id = selected_anchor
add_comment(sso, circle_id, related_id, task_id)
else:
print("未能获取主播数据,无法执行留言操作。")
# 在任务之间停止 1 到 3 秒
time.sleep(random.randint(1, 2))
return response_data
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except ValueError:
print("响应内容不是有效的 JSON 格式")
return None
def fx_xx(sso): # 分享心喜
headers = create_headers(sso)
urlfx = BASE_URL + 'dailyTask/share'
try:
response = requests.get(urlfx, headers=headers)
if response.status_code == 200:
response_data = response.json() # 解析 JSON 响应
#print("分享心喜完整响应内容: ", response_data)
# 检查 data 是否存在
if response_data.get('data'):
task_name = response_data['data'].get('taskName', '未知任务')
single_reward = response_data['data'].get('singleReward', '未知奖励')
print(f"完成🎉 {task_name},🥂奖励: {single_reward}")
print()
else:
#print("分享心喜任务完成,但未获取到详细信息。")
print("🤡🤡分享心喜任务🤡🤡。")
time.sleep(3)
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def sign_dk(sso, task_id): # 签到打卡
sign_in_url = BASE_URL + f'sign/in?dailyTaskId={task_id}' # 构造签到 URL
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(sign_in_url, headers=headers) # 使用 GET 方法签到
if response.status_code == 200:
response_data = response.json()
#print(f"签到成功!响应内容: {response.text}")
# 在访问 taskResult 之前检查它是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
print("签到成功,但未获取到任务结果。")
return True # 签到成功
else:
print(f"签到失败,状态码:{response.status_code}, 响应内容: {response.text}")
return False # 签到失败
except requests.exceptions.RequestException as e:
print(f"签到请求失败: {e}")
return False # 请求失败
def xysp(sso): # 想要商品
headers = {
'sso': sso,
'Host': 'cdn-api.xinc818.com' # 注意这里的 Host
}
random_page_num = random.randint(1, 15)
url_desire_goods = f'https://cdn-api.xinc818.com/mini/integralGoods?orderField=sort&orderScheme=DESC&pageSize=10&pageNum={random_page_num}'
try:
response = requests.get(url_desire_goods, headers=headers)
print(f"随机选择的页数为: {random_page_num}")
if response.status_code == 200:
response_json = response.json()
goods_list = response_json.get('data', {}).get('list', [])
if goods_list:
random_id = random.choice(goods_list)['id']
print("随机选取的商品ID:", random_id)
headers['Host'] = 'api.xinc818.com' # 更新headers为 api.xinc818.com
url_specific_good = f'https://api.xinc818.com/mini/integralGoods/{random_id}?type'
response_specific = requests.get(url_specific_good, headers=headers)
if response_specific.status_code == 200:
response_specific_json = response_specific.json()
outer_id = response_specific_json.get('data', {}).get('outerId', '')
print(f"提取的outerId: {outer_id}")
# 新的POST请求
url_submit = 'https://api.xinc818.com/mini/live/likeLiveItem'
data = {
"isLike": True,
"dailyTaskId": 20,
"productId": outer_id
}
print(data)
submit_response = requests.post(url_submit, headers=headers, json=data)
if submit_response.status_code == 200 and submit_response.headers.get('Content-Type') == 'application/json':
response_data = submit_response.json()
#print(submit_response.json())
if response_data is None:
print(f"响应的JSON数据为空,原始响应内容: {submit_response.text}")
return # 退出当前任务
if response_data.get("data"):
task_name = response_data["data"].get("taskName", "未知任务")
single_reward = response_data["data"].get("singleReward", 0)
print(f" 🎉: {task_name},奖励: {single_reward}")
else:
print("想要商品 请求成功但未完成任务,响应码或数据内容不正确")
return # 退出当前任务
else:
print(f"POST请求失败,状态码:{submit_response.status_code}, 响应内容: {submit_response.text}")
return # 退出当前任务
else:
print("商品列表为空,跳过当前账号")
return # 退出当前任务
else:
print(f"获取商品列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return # 退出当前任务
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return # 退出当前任务
def dzp_cj(sso, task_id):#大转盘抽奖
try:
headers = create_headers(sso)
# 获取抽奖活动列表
activity_list_url = BASE_URL + 'lottery/list'
activity_response = requests.get(activity_list_url, headers=headers)
if activity_response.status_code != 200:
print('获取活动列表失败,状态码:', activity_response.status_code)
return
activity_data = activity_response.json()
# 查找“幸运大转盘抽奖”的id
activity_id = None
for activity in activity_data['data']:
if activity['activityName'] == "幸运大转盘抽奖":
activity_id = activity['id']
break
if activity_id is None:
print("没有找到‘幸运大转盘抽奖’活动")
return
print(f"找到‘幸运大转盘抽奖’活动, ID: {activity_id}")
# 检查抽奖次数
lottery_url = BASE_URL + f'lottery/{activity_id}/freeNum'
lottery_response = requests.get(lottery_url, headers=headers)
if lottery_response.status_code != 200:
print('检查抽奖次数失败,状态码:', lottery_response.status_code)
return
lottery_data = lottery_response.json()
print('抽奖次数:', lottery_data['data'])
if lottery_data['data'] == 0:
print("没有抽奖机会,过程跳过。")
return
# 获取用户id
user_url = BASE_URL + 'user'
user_response = requests.get(user_url, headers=headers)
if user_response.status_code != 200:
print("获取用户ID失败,状态码: ", user_response.status_code)
return
user_data = user_response.json()
user_id = user_data['data']['id']
print(f"用户ID获取成功: {user_id}")
# 抽奖
lottery_draw_url = BASE_URL + 'lottery/draw'
payload = {
"activityId": activity_id,
"batch": False,
"isIntegral": False,
"userId": user_id,
"dailyTaskId": task_id # 使用变量task_id作为dailyTaskId
}
#print(payload)
lottery_draw_response = requests.post(lottery_draw_url, headers=headers, json=payload)
if lottery_draw_response.status_code == 200:
#print("抽奖成功!")
lottery_result = lottery_draw_response.json()
lottery_result_list = lottery_result['data']['lotteryResult']
if lottery_result_list:
prize_name = lottery_result_list[0].get('prizeName', '未知奖品')
print("中奖奖品🎉:", prize_name)
else:
print("未获取到抽奖结果")
else:
print("抽奖失败,状态码:", lottery_draw_response.status_code)
time.sleep(3)
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def ll_sp(sso, task_id): # 浏览商品30秒
browse_url = BASE_URL + f'dailyTask/browseGoods/{task_id}'
headers = create_headers(sso)
print(browse_url)
try:
# 模拟浏览前的准备请求
pre_browse_response = requests.get(browse_url, headers=headers)
if pre_browse_response.status_code != 200:
print(f"浏览前的请求失败,状态码:{pre_browse_response.status_code}")
return False
# 模拟用户浏览30秒
time.sleep(3) # 注意这里应该是30秒,但现在是3秒
# 模拟浏览后的完成请求
post_browse_response = requests.get(browse_url, headers=headers)
if post_browse_response.status_code == 200:
print("成功模拟浏览商品30秒")
# 打印响应内容,如果需要
print(post_browse_response.json()) # 更正的行
print(f"浏览后的响应内容: {post_browse_response.text}")
return True
else:
print(f"浏览后的请求失败,状态码:{post_browse_response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return False
def fetch_posts_data(sso):#获取帖子数据并提取 点赞用户id 和 关注用户publisherId,然后随机选择一个
url = "https://cdn-api.xinc818.com/mini/posts/sorts?sortType=SPAM&pageNum=1&pageSize=10&groupClassId=0"
headers = {
"Host": "cdn-api.xinc818.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
"sso": sso
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json().get('data', {}).get('list', [])
extracted_data = [(item['id'], item['publisherId']) for item in data]
if extracted_data:
# 随机选择一个帖子
selected_post = random.choice(extracted_data)
return selected_post
else:
print("没有找到帖子数据")
return None
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None
def like_post(sso, post_id): # 点赞用户
url = BASE_URL + "posts/like"
headers = create_headers(sso)
payload = {
"postsId": str(post_id),
"decision": True
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"💨成功点赞帖子,帖子ID: {post_id}")
#print(f"点赞帖子 完整响应内容: {response.text}")
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f"任务名称: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("点赞成功,但未获取到任务详情。")
#print("👻👻点赞👻👻。")
print()
else:
print(f"点赞失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def gz_user(sso, follow_user_id): # 关注用户
url = BASE_URL + "user/follow"
headers = create_headers(sso)
payload = {
"decision": True,
"followUserId": follow_user_id
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"成功关注💗用户,用户ID: {follow_user_id}")
#print(f"完整响应内容: {response.text}") # 打印完整的响应内容
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("关注成功,但未获取到任务详情。")
print()
else:
print(f"关注用户失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):#"获取用户信息ID 积分
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id # 返回用户ID
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常,返回None
def cytl(sso, post_id): # "参与讨论,对指定帖子发表随机评论
url = BASE_URL + "postsComments"
headers = create_headers(sso)
user_info_result = user_info(sso) # 获取用户信息的结果
# 定义一个评论内容的字典
comments = [
"非常好!",
"我同意!",
"这确实很重要。",
"我从这个评论学到了很多。",
"加油,",
"为你打call!",
"今天也要加油哦!",
"很有见地!",
"太精彩了!",
"赞同这个观点。",
"好好",
"真的很棒!",
"这个我喜欢!",
"太赞了!",
"很有帮助!",
"这是我见过的最好的观点!",
"太有创意了!",
"这让我思考良多。",
"绝对同意!",
"讲得太好了!",
"这才是重点!",
"我刚想到这个!",
"太同意了!",
"这解释得太清楚了!",
"这个分析很到位!",
"你抓住了核心!",
"这个角度很独特!",
"没想到这样的观点,太棒了!"
"情感丰富,真实感人!",
"每次看到你的评论都很期待!",
"你的观点总能给人启发!",
"你的评论总是那么独到!",
"期待你更多的分享!",
"你的观点太有深度了!",
"每次看到你的评论都很受益!",
"这分析太透彻了!",
"你的评论总能点亮我的思考!",
"看到你的评论,我的心情都好了!"
"这让我看到了不同的视角!",
"每个人的看法都很有意思!",
"太有创造力了,我喜欢!",
"这个讨论很有价值!",
"你的见解让人耳目一新!",
"这是一个很棒的开始!",
"从你的评论中学到了很多!",
"你的想法很有启发性!",
"这个观点很有趣,赞一个!",
"你的理解深度让我佩服!",
"这确实是个好问题,值得探讨。",
"谢谢分享,我受益匪浅!",
"这种观点很难得,很欣赏!",
"你的分析很到位,赞同!",
"这样的讨论太精彩了,期待更多!"
]
content = random.choice(comments) # 随机选择一个评论内容
if user_info_result:
user_id = user_info_result[0] # 仅提取 user_id
payload = {
"customizeImages": [],
"content": content,
"postsId": post_id,
"publisherId": user_id, # 使用提取的 user_id
"floorId": "",
"voice": ""
}
#print(payload)
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"🙋参与讨论,帖子ID: {post_id}, 内容: '{content}'")
# 解析响应内容以获取taskName和singleReward
response_json = response.json()
#print("参与讨论完整响应内容: ", response_json)
task_name = response_json.get('data', {}).get('taskResult', {}).get('taskName', '')
single_reward = response_json.get('data', {}).get('taskResult', {}).get('singleReward', '')
print(f"完成🎉{task_name}, 奖励: {single_reward}🔝🔝🔝")
print()
else:
print(f"参与讨论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
else:
print("未能获取用户ID,无法参与讨论。")
def zb_list(sso): # 主播列表
url = BASE_URL + "groups/defaultGroupList"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
group_list = response.json()
# 提取所有主播的id和associatedAnchorId
anchors = [(group.get('id'), group.get('associatedAnchorId')) for group in group_list.get('data', [])]
# 随机选择一个主播
if anchors:
selected_anchor = random.choice(anchors)
group_id, associated_anchor_id = selected_anchor
#print(f"随机选取的群组ID: {group_id}, 关联主播ID: {associated_anchor_id}")
return group_id, associated_anchor_id # 返回随机选取的群组ID和关联主播ID
else:
print("没有可用的主播列表。")
return None, None
else:
print(f"获取主播列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None, None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None, None
def add_comment(sso, circle_id, related_id, daily_task_id):#给主播留言
"""在指定圈子中对指定主播添加评论"""
url = BASE_URL + "anchorComment/addComment"
headers = create_headers(sso)
# 预定义的评论列表
comments = [
"加油,我们支持你!",
"你是最棒的!",
"永远支持你!",
"我们在这里,永远爱你!",
"你总是能给我们带来欢笑!",
"你的直播太棒了!",
"你总是能够打动我们!",
"你的笑容真的很治愈!",
"我们是你坚强的后盾!",
"为你打call!",
"今天也要加油哦!",
"一直在你身后支持你!",
"你是最亮的星!",
"很高兴认识你!",
"你的努力我们都看到了!",
"每一次直播都是一次享受!",
"你的粉丝们都在这里!",
"继续前进,我们会一直在这里!",
"你总是那么有活力!",
"我们看到的不只是努力的你!",
"每次看你的直播都很开心!",
"你的每一次努力我们都看在眼里!",
"你的直播总能给我带来好心情!",
"你的才华无人能及!",
"你的直播是我一天中最期待的时刻!",
"你的魅力真的无法抗拒!",
"在你的直播中总能找到快乐!",
"你的每个瞬间都充满了惊喜!",
"你的努力值得每一份赞赏!",
"你的存在让这个平台更加精彩!",
"期待你的每一次直播!",
"你的每个直播都值得反复观看!",
"你总是能带给我们满满的正能量!",
"你的每一次分享都很有价值!",
"每次看你的直播都能学到很多东西!",
"你的直播里总有无限的乐趣!",
"你的直播总是那么充满活力!",
"你的直播是我们的快乐源泉!",
"感谢你带来这么多美好的直播时光!",
"你的每一场直播都是一场视听盛宴!",
"你的每场直播都是我们的心灵鸡汤!",
"看到你的努力,我们都非常感动!",
"你的笑声太迷人了,每次听都很开心!",
"你的才华横溢,每场直播都令人期待!",
"感谢你总是带给我们这么多正能量!",
"你的每一次直播都给我留下深刻印象!",
"你是我们的超级明星,永远支持你!",
"你在直播中的每一刻都是那么的真实可爱!",
"你的直播是我一天中最放松的时光!",
"每次看到你,都觉得世界变得更美好了!",
"你的直播充满了温暖和力量!",
"你的存在就是我们的幸运!",
"你的直播总是那么富有创造力和想象力!",
"你是那么的不同凡响,总能带来惊喜!",
"你的直播是我的精神食粮!",
"看着你的成长和进步,我们都为你感到骄傲!",
"你总是那么的充满魅力和活力!",
"你的每一次直播都是一次美好的旅行!",
"你的存在让我们的生活充满了乐趣!",
"你是我们心中的英雄,永远支持你!",
"你的直播总是能点亮我们的生活!",
"每次听你说话都特别有感染力!",
"你的直播总是那么有趣,让人忍不住一直看!",
"你的每一次表演都是那么精彩,无法挪开眼!",
"你的直播总是给我带来好心情,谢谢你!",
"每次看你直播都有新的收获,真的很棒!",
"你的直播里总有无尽的正能量,真的很喜欢!",
"你的直播总是那么温馨,感觉像回到家一样!",
"你是我们的快乐小天使,每次看到你都特别开心!",
"你的每一次直播都是我们的期待!",
"你的直播中总有许多惊喜,让人意犹未尽!",
"每次看你的直播都能感受到你的用心!",
"你的直播就像一股清泉,沁人心脾!",
"你的每一次直播都是我们的精神食粮!",
"你的直播总能带给我不一样的感受,太棒了!",
"你的直播充满了智慧和趣味,真是太有才了!",
"你的直播总是能给我们带来欢乐和知识,感谢你!",
"每次看你直播都有种被治愈的感觉!",
"你的直播总是那么充满活力,真是太赞了!",
"你的直播给了我们很多快乐,永远支持你!",
]
# 随机选择一个评论内容
content = random.choice(comments)
payload = {
"content": content,
"circleId": circle_id,
"relatedId": related_id,
"contentType": 0, # contentType固定为0
"dailyTaskId": daily_task_id,
"topCommentId": 0, # topCommentId固定为0
}
#print(payload) # 打印请求内容
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
response_data = response.json()
print(f"💬评论成功,圈子ID: {circle_id}, 主播ID: {related_id}, ✍️: '{content}'")
#print(f"评论 完整响应内容: {response_data}") # 打印完整的响应内容
# 检查 response_data['data'] 和 response_data['data']['taskResult'] 是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("评论成功,但未获取到任务详情。")
print()
else:
print(f"评论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
if user_data is None: # 检查 user_data 是否为 None
#print(f"未能获取到 {sso} 的用户数据。")
return None
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
#print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id, integral, history_integral # 以元组形式返回
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常,返回None
def fetch_hitokoto(): #一言
url_hitokoto = 'https://v1.hitokoto.cn/'
# 设置请求参数
params = {
'c': 'k', # 类型为哲学
'min_length': 10 # 设置返回句子的最小长度为 10
}
try:
response_hitokoto = requests.get(url_hitokoto, params=params)
if response_hitokoto.status_code == 200:
data = response_hitokoto.json()
return data.get('hitokoto')
else:
# 主要 API 请求失败,尝试备用 API
return fetch_hitokoto_backup()
except requests.exceptions.RequestException:
# 主要 API 请求异常,尝试备用 API
return fetch_hitokoto_backup()
def fetch_hitokoto_backup(): # 备用一言
url_backup = 'https://api.7585.net.cn/yan/api.php?charset=utf-8'
try:
response_backup = requests.get(url_backup)
if response_backup.status_code == 200:
return response_backup.text.strip() # 返回备用 API 的响应内容
else:
return f"备用API请求失败,状态码:{response_backup.status_code}"
except requests.exceptions.RequestException as e:
return f"备用API请求异常: {e}"
def rw_post(sso, content, task_id): # 发帖
headers = create_headers(sso)
url = BASE_URL + "posts"
sid = int(time.time() * 1000) # 生成时间戳
data = {
"topicNames": [],
"content": content,
"groupId": 0,
"groupClassifyId": 0,
"attachments": [],
"voteType": 0,
"commentType": "0",
"dailyTaskId": task_id,
"platform": "android",
"sid": sid
}
#print(data)
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
response_json = response.json()
print(f"💬发帖成功,✍️{content}, 任务ID: {task_id}, 时间戳: '{sid}'")
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("发帖成功,但未获取到任务详情。")
print()
#print(f"发帖完整响应内容: {response.text}")
else:
print(f"发帖请求失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def hqjljl(sso): # 奖励记录 积分
"""获取奖励记录,并返回当天的积分总和"""
records_url = BASE_URL + f'user/integralRecord?pageNum=1&pageSize=20'
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(records_url, headers=headers)
if response.status_code == 200:
data = response.json()
total_points_today = 0
today = datetime.now(timezone(timedelta(hours=8))).date() # 获取当前日期
for record in data.get('data', {}).get('list', []):
# 提取 remark、changeValue 和 changeTime
remark = record.get('remark')
change_value = record.get('changeValue')
change_time = record.get('changeTime')
# 将时间戳转换为北京时间
beijing_time = datetime.fromtimestamp(change_time / 1000, timezone(timedelta(hours=8)))
formatted_time = beijing_time.strftime('%Y-%m-%d %H:%M:%S')
# 判断记录是否为当天的
if beijing_time.date() == today:
total_points_today += change_value
# 在循环内部打印每条记录的详细信息
#print(f"任务: {remark}, 积分: {change_value}, 时间: {formatted_time}")
print(f"今日积分: {total_points_today}")
return total_points_today # 返回当天的积分总和
else:
print(f"获取奖励记录失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None # 获取失败时返回 None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None # 请求异常时返回 None
#本地测试用
os.environ['XSSONF1'] = '''
Wmeimob_eyJ0ebGciOiJIUzI1NiJ9.eyJzdWIiOiIx#大号
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush() # 确保及时输出
def flush(self):
for file in self.files:
file.flush()
# 主函数
def main():
var_name = 'XSSONF'
tokens = get_env_variable(var_name)
if not tokens:
return
yxsl = len(tokens) # 账号总数
# 首先对每个账号运行 rwlb(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
print("令牌格式不正确。跳过处理。")
continue
sso = parts[0]
account_no = parts[1]
print(f'------账号 {i+1}/{yxsl} {account_no} -------')
rwlb(sso) # 任务列表
# 设置Tee类实例并开始捕获输出
original_stdout = sys.stdout # 保存原始stdout
string_io = io.StringIO() # 创建StringIO对象以捕获输出
sys.stdout = Tee(sys.stdout, string_io) # 将stdout重定向
# 所有账号运行完 rwlb(sso) 后再统一运行 hqjljl(sso) 和 user_info(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
continue # 如果令牌格式不正确,继续下一个
sso = parts[0]
account_no = parts[1]
print(f'---账号{i+1}/{yxsl} {account_no}---')
user_info_result = user_info(sso) # 获取 user_info 函数的返回值
if user_info_result is None:
print(f"由于某些原因/过期/不正确 跳过此账号。")
continue # 跳过此次循环的剩余部分
# 解包 user_info 函数返回的元组
user_id, integral, history_integral = user_info_result
print(f"🎊当前积分: {integral}, 历史积分: {history_integral}") # 打印积分信息
hqjljl(sso) # 处理奖励
# 捕获完成后,重置sys.stdout并获取内容
sys.stdout = original_stdout # 重置stdout
output_content = string_io.getvalue() # 获取捕获的输出
# 如果需要发送通知
if enable_notification == 1:
send("心喜-通知", output_content)
if __name__ == "__main__":
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mt1314/ql.git
git@gitee.com:mt1314/ql.git
mt1314
ql
ql
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385