1 Star 0 Fork 10

ieping/薅羊毛

forked from hotootop/薅羊毛 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
apps.py 92.53 KB
一键复制 编辑 原始数据 按行查看 历史
hotootop 提交于 2021-02-02 11:09 . 主要代码

from time import sleep
from typing import Dict
from random import choice
import re
from random import randint, uniform
from base import (switch_task, TaskException,
DoneException, RetryException,)
from myauto import affirm_decorator, watcher_decorator, watchers_decorator
APPS_CONTENT = ['快手','头条', '淘宝', '中青', '惠头条', '微视', '抖火',
'快看']
APPS_SIGN = ['快手','头条', '番茄', '火山', '淘宝', '中青', '惠头条', '微视',
'快看', '抖火',
# '拼多多'
]
APPS_TIMED = ['头条', '火山']
APPS_PEROID = ['头条', '抖火', '火山']
APPS_NIGHT = ['京东',]
APPS_HOUR = ['猎豹','快手','快看', '惠头条']
APPS_TIME = {'快手': 20,'头条': 20, '番茄': 10, '火山': 20, '淘宝': 20, '中青': 3,
'惠头条': 20, '微视': 5, '抖火':10, '快看':3}
class FanQie:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
if self.u.find('福利', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self, dispose_sign=True):
if self.u.find('继续阅读'):
self.u.click_('sr', 'id')
if self.u.find('afu', 'id'):
self.u.back()
self.u.click_('加入书架')
if not self.u.click_('福利'):
self.u.tap((0.501, 0.967))
if dispose_sign:
self.dispose_pop_up_sign_in()
if self.u.find('现金', timeout=10, contains=True):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def dispose_pop_up_sign_in(self):
self.log.info('处理签到弹窗')
sleep(5)
self.u.click_('以后再说')
if self.u.click_('看视频再领300金币'):
self._dispose_ad()
self.u.click_('立即领取')
def get_cash_data(self, today_cash):
self.log.info('获取资金')
if not self.u.click_('跳转'):
self.u.click_('现金收', contains=True)
self.u.tap((0.151, 0.145))
sleep(3)
desc_text = self.u.class_attributes('android.view.View')
if not desc_text:
self.log.info('获取资金出错,重试')
raise RetryException
for i in desc_text:
if i:
re_cash = re.match(r'(\d{1,2}\.\d{1,2})$',i)
if re_cash:
self.log.info(f'现金为{re_cash.group()}')
today_cash['番茄']['now_cash'] = float(re_cash.group())
break
for i in desc_text:
if '+' in i:
today_cash['番茄']['get_cash'] = float(i[2:])
break
for i in desc_text[::-1]:
if '累计收益' in i:
today_cash['番茄']['all_cash'] = float(i[4:-1])
break
sleep(randint(1, 3))
if today_cash['番茄']['now_cash'] > 15:
self.get_cash()
self.u.back()
def get_cash(self):
self.log.info('获取资金')
self.u.click_('跳转') # 页面:我的
self.u.click_('去提现')
self.u.click_('提现15.00元')
def open_treasure_chest(self):
self.log.info('开宝箱')
if not self.u.click_('开宝箱得金币'):
self.log.info('完善')
self.u.tap((0.744, 0.787), (1080, 2040))
if self.u.click_('看视频再领300金币'):
self._dispose_ad()
else:
self.log.info('宝箱未刷新')
def watch_ad(self):
self.log.info('看广告')
if self.u.find('现金金额:'):
self.u.swipe_page('down', 0.7)
if self.u.click_('立即观看'):
self.log.info('看广告')
self._dispose_ad()
else:
self.log.info('广告看完')
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('tt_video_ad_close', 'id'):
self.log.info('关闭广告')
return True
else:
self.log.info('处理广告出现错误。')
return False
def content_task(self, task_control):
self.log.info('看内容任务')
self.u.swipe_page('down', 0.7)
if not self.u.click_('阅读30秒最高奖励'):
self.u.click_('阅读', contains=True)
self.log.info('看小说')
if randint(0, 1):
self.u.swipe_page('down', 0.7)
# 使用递归选择书籍,因为有书籍没更新。
self._choice_book()
if self.u.click_('加入书架'):
self.u.click_('免费阅读')
if self.u.find('书籍简介'):
self.u.swipe_page('right', 0.7)
while True:
# 书籍更新中
if self.u.click_('看视频再领', contains=True):
self._dispose_ad()
if self.u.click_('去书架'):
self._choice_book()
if self.u.click_('加入书架'):
self.u.click_('免费阅读')
if self.u.find('书籍简介'):
self.u.swipe_page('right', 0.7)
switch_task(task_control, '番茄')
self.u.click_('wn', 'id')
if (self.u.find('继续阅读下一章 >') or
self.u.find('成为会员无广告畅读')):
self.u.swipe_page('right', 0.7)
if self.u.click_('看视频免30分钟广告'):
# 番茄出现问题,看视频免和得金币会提前一页。
self.log.info('看视频免广告')
if self.u.find('上一章'):
self.u.tap((0.7, 0.7))
if not self.u.find('看视频免30分钟广告'):
if self._dispose_ad():
if self.u.find('恭喜获得免广告权益30分钟'):
self.u.tap((0.498, 0.563), (720, 1600))
self.u.swipe_page('right', 0.7)
else:
self.u.swipe_page('right', 0.7)
self.u.click_('看视频免30分钟广告')
# 番茄出现问题,看视频免和得金币会提前一页。
if self._dispose_ad():
if self.u.find('恭喜获得免广告权益30分钟'):
self.u.tap((0.498, 0.563), (720, 1600)) #我知道了
self.u.swipe_page('right', 0.7)
self.log.info('完成')
if self.u.click_('看视频领300金币'):
self.log.info('看视频得金币')
if self.u.find('上一章'):
self.u.tap((0.7, 0.7))
if not self.u.find('看视频领300金币'):
self._dispose_ad()
self.u.swipe_page('right', 0.7)
else:
self.u.swipe_page('right', 0.7)
if self.u.click_('看视频领300金币'):
self._dispose_ad()
self.u.swipe_page('right', 0.7)
self.log.info('完成')
# 弹窗广告
self.u.click_('sr', 'id', 1)
if self.u.find('点评此书'):
self.u.tap((0.786, 0.863), (1080, 2040))
self.u.click_('ao9', 'id')
self.u.click_('ao9', 'id')
# 翻页
self.u.tap((round(uniform(0.7, 0.8), 2), round(uniform(0.7, 0.8), 2)))
if self.u.find('想法'):
self.u.back()
# 若点进去评论,推出
if self.u.find('本章评论'):
self.u.back()
self.u.swipe_page('right', 0.7)
# 章结束后评论
if self.u.find('aex', 'id', 1) or self.u.find('a8l', 'id', 1):
self.u.swipe_page('right', 0.7)
sleep(5)
self.u.click_('a6u', 'id', 1)
sleep(round(uniform(0.1, 1.5),1))
def _choice_book(self):
if self.u.find('书架', 'text', 6):
pass
else:
sleep(3)
descs_text = self.u.class_attributes('android.widget.TextView')
texts = []
for i in descs_text:
if '章' in i:
texts.append(i)
try:
self.u.click_(choice(texts), 'text')
except IndexError:
self.log.info('未能进入选择界面, 重启')
self.u.back()
try:
self.u.click_(choice(texts), 'text')
except IndexError:
self.log.info('重启')
raise RetryException
if self.u.click_('去书架'):
self._choice_book()
else:
return True
class FanQieMain:
def __init__(self, app, task_control, today_cash, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('番茄') == '签到':
self._first()
raise TaskException
elif self.task_control.get('番茄') == '周期':
self._period()
self.task_control['番茄'] = '内容'
raise TaskException
elif self.task_control.get('番茄') == '内容':
self._content()
else:
self._content()
except RetryException:
self.app.log.info('重启')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _first(self):
self.app.into_tasks_page()
sleep(2)
self.app.get_cash_data(self.today_cash)
self.app.open_treasure_chest()
self.app.watch_ad()
def _period(self):
sleep(5)
self.app.into_tasks_page(False)
sleep(6)
self.app.open_treasure_chest()
if self.app.u.find('活动说明', 'text', 3):
self.app.u.back()
self.app.watch_ad()
def _content(self):
self.app.into_tasks_page(False)
sleep(6)
self.app.content_task(self.task_control)
class HuoShan:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
if self.u.find('首页', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self, dispose_sign=True):
self.u.click_('红包')
if self.u.find('活动说明') or self.u.find('a6p', 'id'):
self.u.back()
if dispose_sign:
sleep(10)
self.dispose_pop_up_sign_in()
if self.u.find('活动说明') or self.u.find('a6p', 'id'):
self.u.back()
if self.u.find('现金余额(元)', timeout=10) or self.u.find('去赚钱'):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def dispose_pop_up_child(self):
self.log.info('处理青少年弹窗')
def dispose_pop_up_sign_in(self):
self.log.info('处理签到弹窗')
sleep(4)
# 邀请弹窗
self.log.info('完善')
self.u.click_('//android.app.Dialog/android.view.View[1]/android.view.View[2]/android.view.View[1]', 'xpath')
if self.u.find('活动说明') or self.u.find('a6p', 'id'):
self.u.back()
if self.u.find('好友看视频奖励'):
if not self.u.click_('好的'):
self.u.click_('7X8wOKVRL2TPZ+aAAAAABJRU5ErkJggg==') # 关闭按钮
if self.u.click_('看广告', contains=True):
#增加看广告
# 有时有广告有时没广告。没广告是邀请好有
sleep(randint(1, 3))
self._dispose_ad()
self.u.click_('//android.app.Dialog/android.view.View[1]/android.view.View[2]/android.view.View[1]', 'xpath')
if self.u.find('活动说明') or self.u.find('a6p', 'id', 1):
self.u.back()
def get_cash_data(self, today_cash):
self.log.info('获取资金')
if self.u.find('好友看视频奖励'):
self.u.click_('好的')
self.u.click_('现金余额(元)')
sleep(5)
desc_text = self.u.class_attributes('android.view.View')
for i in desc_text:
if i:
re_cash = re.match(r'(\d{1,2}\.\d{1,2})$',i)
if re_cash:
self.log.info(f'现金为{re_cash.group()}')
today_cash['火山']['now_cash'] = float(re_cash.group())
break
for i in desc_text:
if '+' in i:
today_cash['火山']['get_cash'] = float(i[2:])
break
for i in desc_text[::-1]:
if '累计收益' in i:
today_cash['火山']['all_cash'] = float(i[4:-1])
break
if today_cash['火山'].get('now_cash', 0) > 0.5:
self.do_cash()
sleep(randint(1, 3))
self.u.back()
if self.u.find('活动说明', 1) or self.u.find('a6p', 'id', 1):
self.u.back()
def do_cash(self):
self.log.info('提现')
self.u.click_('去提现',)
sleep(randint(1,3))
if not self.u.click_('看视频'):
self.u.click_('0.30')
if not self.u.find('明天再来看视频可提现' ):
self._dispose_ad()
if not self.u.click_('看视频'):
self.u.click_('0.30')
self.u.click_('a6p', 'id')
if not self.u.click_('天天提'):
self.u.click_('0.20')
self.u.click_('a6p', 'id')
self.log.info('完成提现0.5元')
self.u.back()
def open_treasure_chest(self):
self.log.info('开宝箱')
if self.u.click_('开宝箱得金币'):
sleep(randint(2, 3))
if self.u.click_('金币翻8倍', contains=True):
self._dispose_ad()
else:
self.u.click_('javascript:;', 3)
def watch_ad(self):
self.log.info('看广告')
if self.u.click_('去赚钱'): # 广告
if self.u.find('活动说明') or self.u.find('a6p', 'id'):
self.u.back()
self._dispose_ad()
else:
self.log.info('广告未刷新')
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('关闭广告'):
self.log.info('关闭广告')
return True
else:
self.log.info('退出广告遇到问题')
return False
def watch_content(self, task_control):
self.log.info('看内容')
if self.u.click_('累积观看视频', contains=True):
while True:
switch_task(task_control, '火山')
num = randint(3, 8)
sleep(num)
if self.u.find('a3n', 'id', 2):
self.log.info('刷到广告')
sleep(randint(15, 20))
self.u.click_('领取', 'text', 5)
sleep(randint(1, 3))
self.u.swipe_page('down', 0.75)
if self.u.find('100金币'):
self.log.info('刷到抽奖')
self._do_lucky_draw()
num = randint(2, 6)
sleep(num)
if num == 5:
# 点赞
self.u.click_('x6', 'id', 2)
self.u.click_('p9', 'id', 2)
self.u.swipe_page('down', 0.75)
else:
self.log.info('完成任务')
raise DoneException
def _do_lucky_draw(self):
for i in range(4, -1, -1):
if i == 0:
self.u.swipe_page('down', 0.75)
else:
if self.u.click_('剩余{}次'.format(i)):
self.log.info('完善')
self.u.tap((0.542, 0.633), xy_=(1080, 2040))
sleep(5)
self.log.info('完成抽奖')
def do_sleep_task(self, awake_sleep):
self.log.info('领睡觉福利')
self.u.swipe_page('down', 0.75)
sleep(randint(1, 3))
self.u.click_('去领取')
if awake_sleep == 'awake':
if self.u.click_('我睡醒了'):
self.u.click_('领取', contains=True)
sleep(randint(1, 3))
elif awake_sleep == 'sleep':
if self.u.click_('我睡醒了'):
self.u.click_('领取', contains=True)
sleep(randint(1, 3))
self.u.click_('我要睡了')
if self.u.click_('9j1MaKG5QbLACL+oc7BNiiLbfh4FIyCYQIAoPdXAcqOUb0AAAAASUVORK5CYII='):
self.log.info('完成睡觉福利')
self.u.back()
def do_timed_task(self, timed_task):
sleep(randint(4, 6))
if (timed_task.get('task') == 'awake') and ('火山' not in timed_task.get('awake')):
self.do_sleep_task('awake')
timed_task.get('awake').append('火山')
elif (timed_task.get('task') == 'sleep') and ('火山' not in timed_task.get('sleep')):
self.do_sleep_task('sleep')
timed_task.get('sleep').append('火山')
class HuoShanMain:
def __init__(self, app, task_control, today_cash, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
self.timed_task = timed_task
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('火山') == '签到':
self._first()
raise TaskException
elif self.task_control.get('火山') == '定时':
sleep(5)
self.app.into_tasks_page(False)
self.app.do_timed_task(self.timed_task)
self.task_control['火山'] = '内容'
raise TaskException
elif self.task_control.get('火山') == '周期':
self._period()
self.task_control['火山'] = '内容'
raise TaskException
elif self.task_control.get('火山') == '内容':
self._content()
else:
self._content()
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
sleep(1)
self.app.u.open_app(self.app.package)
sleep(5)
def _first(self):
sleep(10)
self.app.dispose_pop_up_child()
sleep(randint(1, 3))
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
self.app.watch_ad()
self.app.open_treasure_chest()
def _period(self):
self.app.into_tasks_page(False)
self.app.watch_ad()
self.app.open_treasure_chest()
def _content(self):
self.app.into_tasks_page(False)
self.app.u.swipe_page('down', 0.6)
self.app.watch_content(self.task_control)
class JingDong:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def sign_in(self):
self.log.info('签到')
sleep(3)
self.u.click_('close', 'id', 3) #弹窗关闭按钮
self.u.click_('现金签到')
sleep(2)
if self.u.click_('立即签到'):
sleep(randint(1, 3))
self.u.back()
self.u.back()
self.u.click_('残忍离开')
else:
self.u.back()
if not self.u.find('赚钱'):
self.u.back()
self.u.click_('残忍离开')
self.u.click_('不开启')
self.u.click_('不想开启')
def affirm_started(self):
self.u.click_('close', 'id', timeout=20)
if self.u.find('现金签到', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self):
self.log.info('转到任务页面')
self.u.click_('close', 'id', 3)
if not self.u.click_('赚钱'):
self.u.tap((0.5, 0.969), (1080, 2040))
if self.u.find('每日任务', timeout=10):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def get_cash_data(self, today_cash):
sleep(3)
desc_text = self.u.class_attributes('android.widget.TextView')
for i in desc_text:
if '约' in i:
now_cash = i[1:-1]
self.log.info('现金为:'+ now_cash)
today_cash['京东']['now_cash'] = float(now_cash)
today_cash['京东']['get_cash'] = 0
today_cash['京东']['all_cash'] = float(now_cash)
break
def watch_product(self, task_control):
self.log.info('逛商品')
if not self.u.find('已完成'):
self.u.click_('逛商品赚金币')
sleep(3)
while True:
switch_task(task_control, '京东')
self.u.click_('点击逛下一个', 'text', randint(1, 3))
self.u.swipe_page('down', uniform(0.55, 0.7))
sleep(randint(1, 2))
if self.u.find('立即领取'):
self.log.info('完善')
if self.u.find('今日已完成'):
self.log.info('完成逛商品')
self.log.info('完善')
self.u.click_("""//*[@resource-id="com.jd.jdlite:id/ll_content"]/android.widget.FrameLayout[1]""",
'xpath')
break
else:
self.log.info('已完成逛商品')
def watch_sales(self, task_control):
self.log.info('逛活动')
if len(self.u.find('已完成')) == 1:
self.u.click_('逛活动赚金币')
sleep(5)
while True:
switch_task(task_control, '京东')
self.u.swipe_page('down', 0.6)
sleep(randint(1, 2))
if self.u.find('仅可购买自营文娱部分商品'):
self.log.info('完善')
self.u.tap( (0.94, 0.154), (1080,2040))
self.u.click_('close', 'id')
self.u.click_('点击逛下一个')
if self.u.find('今日已完成'):
self.log.info('完成逛活动')
self.log.info('完善')
self.u.click_("""//*[@resource-id="com.jd.jdlite:id/ll_content"]/android.widget.FrameLayout[1]""",
'xpath')
break
else:
self.log.info('已完成逛活动')
def watch_video(self, task_control):
self.log.info('看视频')
if self.u.click_('看视频赚金币'):
sleep(3)
if self.u.find('边看边买') or self.u.find('tv_task_bottom_1', 'id'):
for i_ in range(3):
try:
self.u.swipe_page('down', 0.6)
texts_ = self.u.class_attributes('android.widget.TextView')
texts = []
for i in texts_:
if len(i)>13:
texts.append(i)
self.u.click_(choice(texts), 'text')
while True:
switch_task(task_control, '京东')
sleep(randint(5, 15))
if self.u.find('今日已完成','text', 2):
self.log.info('完成看视频')
self.log.info('完善')
self.u.click_("""//*[@resource-id="com.jd.jdlite:id/ll_content"]/android.widget.FrameLayout[1]""",
'xpath')
break
self.u.click_('点击逛下一个', 'text', randint(1, 4))
self.u.click_('继续看视频')
self.u.swipe_page('down', 0.7)
break
except IndexError:
continue
else:
self.log.info('已完成看视频')
def do_other(self):
self.log.info('其他参与任务')
self.u.swipe_page('down', 0.7)
sleep(3)
for text in ['领88元打车券', '分享赚金币', '红包大富翁', '天天抽奖',
'推推赚大钱', '水果免费领', '一元赢大奖','砍价免费拿','去签到' ]:
if self.u.find('任务中心', 'text'):
if text == '水果免费领':
self.u.swipe_page('down', 0.5)
if self.u.click_(text, 'text', 2):
sleep(uniform(0.5, 1))
self.u.back()
sleep(1)
if self.u.find('恭喜获赠今日额外神券'):
self.log.info('完善')
self.u.tap((0.81, 0.25), (720, 1600))
if self.u.click_('立即签到'):
if self.u.find('立即使用'):
self.log.info('完善')
self.u.tap( (0.813, 0.206), (1080,2040))
self.u.back()
self.u.click_('一键签到')
self.u.click_('残忍离开')
self.u.click_('残忍拒绝')
self.u.click_('不想开启')
sleep(uniform(0.5, 1))
class JingDongMain:
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('京东') == '签到':
self.app.sign_in()
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
raise TaskException
else:
self._other()
raise DoneException
except RetryException:
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _other(self):
sleep(15)
self.app.into_tasks_page()
# 向下移动
self.app.watch_product(self.task_control)
sleep(randint(1, 3))
self.app.watch_sales(self.task_control)
sleep(randint(1, 4))
self.app.watch_video(self.task_control)
sleep(randint(1, 4))
self.app.do_other()
class KuaiShou:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
self.tasks_data: Dict = {}
def affirm_started(self):
if self.u.find('left_btn', 'id', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def dispose_pop_up_child(self):
self.log.info('处理青少年弹窗')
def dispose_pop_up_sign_in(self):
breakpoint()
self.log.info('处理签到弹窗')
self.u.click_('立即签到', 'text', 5)
close_xy = {(1080, 2040): (0.114, 0.194), (720, 1600): (0.12, 0.241)}
if self.u.find('邀请好友赚更多', 'text', 3) or self.u.find('打开签到提醒', 'text'):
self.u.tap(close_xy.get(self.u.xy, (0.114, 0.2)))
if self.u.find('将存入「人气助力」明细'):
self.log.info('完善')
if self.u.find('本周已发奖', contains=True):
self.u.back()
def into_tasks_page(self, dispose_sign=True):
self.u.click_('close', 'id', 3)
if self.u.find('立即加购'):
self.u.swipe_page('down', 0.7)
if not self.u.click_('left_btn', 'id', 10):
self.u.back()
self.u.click_('left_btn', 'id', 3)
self.u.click_('去赚钱')
if dispose_sign:
self.dispose_pop_up_sign_in()
if self.u.find('网络不给力,点击屏幕重试'):
self.u.tap((0.56, 0.56))
if self.u.find('日常任务', timeout=20):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def get_cash_data(self, today_cash):
"""获取当前主页的现金信息"""
self.log.info('获取资金')
if not self.u.click_('现金余额(元)'):
if not self.u.click_('现金收益'):
self.u.click_p(r"png/kuaishou/cash.png")
sleep(3)
for i in range(3):
if not self.u.find('现金明细'):
sleep(2)
else:
break
else:
self.log.info('不成功,重启')
raise RetryException
desc_text = self.u.class_attributes('android.view.View')
if desc_text:
for i in desc_text:
if i:
re_cash = re.match(r'(\d{1,2}\.\d{1,2})$',i)
if re_cash:
self.log.info(f'现金为{re_cash.group()}')
today_cash['快手']['now_cash'] = float(re_cash.group())
break
for i in desc_text:
if i.startswith('+') and i.endswith('元'):
today_cash['快手']['get_cash'] = float(i[1:-1])
break
for i in desc_text[::-1]:
if '已累计' in i:
if '.' in i:
re_cash = re.search(r'(\d+\.\d{0,2})',i)
else:
re_cash = re.search(r'(\d+)',i)
if re_cash:
self.log.info(f'累计现金为{re_cash.group()}')
today_cash['快手']['all_cash'] = float(re_cash.group())
break
sleep(randint(1, 3))
self.u.back()
def open_treasure_chest(self):
self.log.info('开宝箱')
if self.u.click_('开宝箱得金币'):
if self.u.click_('看精彩视频赚更多'):
if not self.u.click_('领取奖励'):
self._dispose_ad()
else:
self.log.info('点击看宝箱广告出错')
elif self.u.find('明日再来'):
self.log.info('宝箱已开完')
else:
self.log.info('宝箱未刷新')
def watch_ad(self):
self.u.swipe_page(pct=0.8)
self.log.info('看广告')
sleep(1)
if self.u.click_('福利') or self.u.click_('福利 领金币'):
self._dispose_ad()
else:
self.log.info('广告已看完')
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('video_close_icon', 'id'):
self.log.info('关闭广告')
return True
else:
self.log.info('广告出错,请检查')
return False
def watch_content(self, task_control):
self.log.info('看内容')
while True:
switch_task(task_control, '快手')
self.u.swipe_page('down', 0.8)
if self.u.find('点击重播', timeout=1):
continue
self.u.tap(uniform(0.2, 1), uniform(0, 0.4))
sleep(randint(1, 4))
# self.u.double_tap((0.5, 0.5))
return True
class KuaiShouMain:
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('快手') == '签到':
self._first()
raise TaskException
elif self.task_control.get('快手') == '小时':
self._period()
raise TaskException
else:
self._content()
except RetryException:
self.app.log.info('重启')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _first(self):
sleep(10)
# 处理青少年模式弹窗
self.app.dispose_pop_up_child()
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
self.app.open_treasure_chest()
self.app.watch_ad()
def _content(self):
self.app.watch_content(self.task_control)
def _period(self):
self.app.into_tasks_page(False)
self.app.open_treasure_chest()
self.app.watch_ad()
class TaoBao:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package\
def affirm_started(self):
if self.u.find('视频', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def sign_in(self):
if not self.u.click_('gold_icon_center', 'id'):
# 可能没有元宝中心,而是一个蛋
self.u.click_('//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]', 'xpath')
self.u.click_('否')
sleep(5)
self.log.info('打开淘宝,进入任务页面')
if self.u.click_('TB1frLSOpP7gK0jSZFjXXc5aXXa-146-54'):
self.log.info('成功签到')
self.open_egg()
def get_cash_data(self, today_cash):
self.log.info('获取资金')
if not self.u.find('赚元宝 换红包 提现金'):
self.log.info('有问题,请检查')
raise RetryException
desc_text = [i for i in self.u.class_attributes('android.view.View') if i]
for i in desc_text:
if '价值' in i:
now_cash = i[2:-2]
self.log.info('现金为:'+ now_cash)
today_cash['淘宝']['now_cash'] = float(now_cash)
today_cash['淘宝']['get_cash'] = 0
today_cash['淘宝']['all_cash'] = float(now_cash)
break
def open_egg(self):
self.log.info('开蛋')
self.u.click_('gold_icon_center', 'id')
if self.u.find('赚元宝 换红包 提现金'):
self.u.click_('TB1qYr6pmslXu8jSZFuXXXg7FXa-500-500')
self.u.click_('TB1TJ.yhhvbeK8jSZPfXXariXXa-80-80')
def watch_live(self, task_control):
self.log.info('看直播')
for i in range(1, 4):
self.log.info(f'第{i}次尝试进入直播页面')
if not self.u.click_('直播'):
self.u.click_('淘宝直播')
self.u.click_('//*[@resource-id="com.taobao.live:id/taobao_live_base_list_recycler_view"]/android.widget.FrameLayout[1]', )
if self.u.click_(f'//*[@resource-id="com.taobao.live:id/taobao_live_base_list_recycler_view"]/android.widget.FrameLayout[{i}]', 'xpath'):
self.log.info('进入直播间')
break
else:
self.log.info('未能进入直播页面,重启')
raise RetryException
while True:
switch_task(task_control, '淘宝')
# 淘宝直播金蛋出现后,要点击,不然卡着,不刷元宝
if not self.u.find('gold_turns', 'id', 3):
self.log.info('金蛋')
for i in range(3):
sleep(10)
self.u.click_('gold_icon_center', 'id')
sleep(1)
self.u.click_('否')
if self.u.find('提现'):
self.open_egg()
sleep(1)
self.u.back()
sleep(5)
if self.u.find('gold_turns', 'id'):
break
continue
else:
sleep(randint(20, 30))
if randint(0, 1):
self.u.swipe_page()
def watch_video(self, task_control):
self.log.info('看视频')
self.u.click_('视频')
sleep(3)
while True:
switch_task(task_control, '淘宝')
self.u.swipe_page('down', 0.75)
sleep_num = randint(4, 10)
sleep(sleep_num)
if sleep_num == 8 and randint(0, 1):
self.u.click_('favor_frame_layout', 'id', 3)
elif sleep_num == 9 and randint(0, 1):
self.u.click_('iv_add', 'id', 3)
elif sleep_num == 6:
self.open_egg()
self.u.back()
class TaoBaoMain:
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('淘宝') == '签到':
self._first()
raise TaskException
else:
self._other()
except RetryException:
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _first(self):
self.app.sign_in()
sleep(5)
self.app.get_cash_data(self.today_cash)
def _other(self):
while True:
if randint(0, 1):
self.app.watch_video(self.task_control)
else:
self.app.watch_live(self.task_control)
class TouTiao:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
self.error_count = 0
def affirm_started(self):
# 首页
if self.u.find('首页', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self, dispose_sign=True):
if not self.u.click_('任务', 'text', 3):
self.u.tap((0.501, 0.962))
if dispose_sign:
self.dispose_pop_up_sign_in()
if self.u.find('网络不给力,点击屏幕重试', timeout=5):
self.u.tap((0.6, 0.6))
sleep(3)
if self.u.find('任务中心', timeout=25) or self.u.find('去查看'):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def dispose_pop_up_sign_in(self):
self.log.info('处理签到弹窗')
sleep(6)
if self.u.click_('看视频再领', 'text', 4):
self._dispose_ad()
self.log.info('已签到')
def get_cash_data(self, today_cash):
sleep(3)
if not self.u.click_('跳转箭头', 'text', 3):
self.u.click_('现金收', contains=True)
self.u.tap((0.151, 0.145))
sleep(5)
self.u.click_("现金收益", 'text', 3)
sleep(3)
desc_text = self.u.class_attributes('android.view.View')
for i in desc_text:
if i:
re_cash = re.match(r'(\d{1,2}\.\d{1,2})$',i)
if re_cash:
self.log.info(f'现金为{re_cash.group()}')
today_cash['头条']['now_cash'] = float(re_cash.group())
break
for i in desc_text:
if '+' in i:
today_cash['头条']['get_cash'] = float(i[2:])
break
for i in desc_text[::-1]:
if '元' in i:
today_cash['头条']['all_cash'] = float(i[:-1])
break
sleep(randint(1, 3))
self.u.back()
def open_treasure_chest(self):
self.log.info('开宝箱')
self.u.find('现金收益', 'text', 10)
if not self.u.click_('开宝箱得金币', 'text', 3):
self.log.info('完善')
self.log.info('宝箱没刷新')
if self.u.click_('看完视频再领', 'text', 10):
self._dispose_ad()
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.find('award-modal-close.e13c3ba2', 'text', 3):
if self.u.click_('9j1MaKG5QbLACL+oc7BNiiLbfh4FIyCYQIAoPdXAcqOUb0AAAAASUVORK5CYII=', 'text', 3):
self.log.info('没有广告')
return True
else:
self.log.info('广告出错,请检查')
sleep(5)
if self.u.long_wait_click('关闭', timeout=15):
self.log.info('关闭广告')
return True
if self.u.click_('关闭试玩', timeout=4):
self.u.click_('关闭')
self.log.info('关闭广告')
return True
else:
self.log.info('关闭广告错误,请检查')
def watch_content(self, task_control):
self.log.info('看内容')
self.u.click_('推荐', 'text')
self.u.swipe_page('down', uniform(0.4, 0.7))
while True:
switch_task(task_control, '头条')
num = randint(1, 3)
for i in range(num):
self.u.swipe_page('down', uniform(0.4, 0.7))
# 通过首页text类获取所有id,再随机点击id进入文章。
# a1p 文章, b_ 广告 post_text_pre_layout 推荐 bz视频和文章,广告, iq视频
# aem 视频广告
# 图文和视频, 不包含广告广告
if (self.u.click_('km', 'id') or self.u.click_('post_text_pre_layout', 'id')
or self.u.click_('iq','id') or self.u.click_('l8', 'id') or
self.u.click_('jn', 'id') or self.u.click_('bz', 'id') or
self.u.click_('post_text', 'id')) :
sleep(2)
if not (self.u.find('a6g', 'id', 4) or self.u.find('a6w', 'id', 4)) :
self.u.back()
self.log.info('没有红包界面,可能是广告,返回')
continue
else:
self.log.info('进入页面,开始浏览')
else:
self.log.info('没能定位到,翻页')
continue
num = randint(10, 20)
# 收藏视频
if self.u.find('a7l', 'id'):
self.log.info('刷到视频')
sleep(num*10+60)
if num == 19:
self.u.click_('a7l', 'id')
# 文章
elif self.u.find('d', 'id'): # 收藏文章
self.log.info('刷到文章')
self._swipe(base_num=10)
# 等于10时,收藏文章
if num == 10:
self.u.click_('d', 'id')
elif self.u.find('icon_image', 'id'):
self._swipe(base_num=10, is_article=False)
# 点赞头条
if num == 20 :
self.u.click_('icon_image', 'id')
elif self.u.find('b91', 'id', 2):
self._swipe(base_num=10, is_article=False)
if num == 20 : # 点赞头条
self.u.click_('b91', 'id', 2)
else:
self._swipe()
# 返回主页面
# 退出详细页面回到主页面
self.u.back()
def _swipe(self, base_num=2, down_num=20, is_article:bool=True):
"""翻页子方法,因为头条的内容需要选择进入详情,才算金币"""
i = 0
while i < randint(base_num, down_num):
i += 1
if self.u.find('重播'):
break
sleep(randint(1, 4))
self.u.swipe_page('down', uniform(0.4, 0.7))
if self.u.find('暂无评论,点击抢沙发'):
break
if is_article:
# 在文章内的点赞按钮 ,只有文章里面有
if self.u.find('a62', 'id'):
break
def do_food_welfare(self):
self.log.info('拿餐补')
if not self.u.find('任务中心'):
self.into_tasks_page()
self.u.click_('去领取')
sleep(2)
self.u.click_('领取', contains=True)
if not self.u.click_('看视频再领'):
self.log.info('完善')
self.u.tap( (0.484, 0.59), (1080, 2040))
self._dispose_ad()
self.u.back()
def do_sleep_task(self, awake_sleep: str):
self.log.info('睡醒拿补贴')
sleep(10)
result = self.u.find('去查看', 'text')
if result:
result[-1].click()
self.log.info('完善')
sleep(5)
if awake_sleep == 'awake':
if self.u.click_('我睡醒了', 'text', 3):
self.u.click_('领取', 'text', 3, contains=True)
if self.u.click_('看视频再领', 'text', 3):
self._dispose_ad()
elif awake_sleep == 'sleep':
if self.u.click_('我睡醒了', 'text', 3):
self.log.info('睡醒拿补贴')
self.u.click_('领取', 'text', 3, contains=True)
if self.u.click_('看视频再领', 'text', 3):
self._dispose_ad()
self.u.click_('我要睡了', 'text', 5)
self.u.back()
def do_timed_task(self, timed_task):
sleep(randint(4, 6))
if not self.u.find('去查看'):
self.u.swipe_page(pct=0.5)
if (timed_task.get('task') == 'awake') and ('头条' not in timed_task.get('awake')):
self.do_sleep_task('awake')
self.do_food_welfare()
timed_task.get('awake').append('头条')
elif (timed_task.get('task') == 'sleep') and ('头条' not in timed_task.get('sleep')):
self.do_sleep_task('sleep')
self.do_food_welfare()
timed_task.get('sleep').append('头条')
elif (timed_task.get('task') == 'breakfast') and ('头条' not in timed_task.get('breakfast')):
self.do_food_welfare()
timed_task.get('breakfast').append('头条')
elif (timed_task.get('task') == 'dinner') and ('头条' not in timed_task.get('dinner')):
self.do_food_welfare()
timed_task.get('dinner').append('头条')
class TouTiaoMain:
"""执行类,无继承"""
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
self.timed_task = timed_task
def main(self):
while True:
try:
self.app.log.info('开机等待')
sleep(10)
self.app.affirm_started()
if self.task_control.get('头条') == '签到':
self.app.u.creat_watcher('头条', 'close button')
self._first()
self.app.u.remove_watcher('头条')
raise TaskException
elif self.task_control.get('头条') == '定时':
self.app.u.creat_watcher('头条', 'close button')
self.app.into_tasks_page(False)
self.app.do_timed_task(self.timed_task)
self.task_control['头条'] = '内容'
self.app.u.remove_watcher('头条')
raise TaskException
elif self.task_control.get('头条') == '周期':
self.app.u.creat_watcher('头条', 'close button')
self._period()
self.task_control['头条'] = '内容'
self.app.u.remove_watcher('头条')
raise TaskException
elif self.task_control.get('头条') == '内容':
self._content()
else:
self._content()
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
except ValueError:
self.app.log.info('未获得现金数据,重启')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _first(self):
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
self.app.open_treasure_chest()
def _period(self):
self.app.into_tasks_page(False)
sleep(3)
self.app.open_treasure_chest()
def _content(self):
self.app.u.click_('首页', 'text', 10)
self.app.watch_content(self.task_control)
class WeiShi:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
# 首页
if self.u.find('推荐', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def dispose_pop_up_child(self):
self.log.info('处理青少年弹窗')
self.u.click_('知道了', 'text', 10)
def sign_in(self):
self.u.click_('我')
self.u.click_('福利中心', timeout=10)
self.u.click_('签到领红包', timeout=10)
def get_cash_data(self, today_cash):
self.log.info('完善')
"""获取当前主页的现金信息"""
today_cash['微视']['now_cash'] = 0
today_cash['微视']['get_cash'] = 0
today_cash['微视']['all_cash'] = 0
# self.log.info('获取资金')
# # 从我菜单进入钱包
# sleep(8)
# self.u.click_('钱包', 'text', 3)
# for i in range(4):
# if self.u.find('账户余额(元)', 'text', 3):
# break
# else:
# sleep(5)
# else:
# raise RetryException
# self.log.info('有问题,重启')
# try:
# self._get_cash_data(today_cash)
# except ValueError:
# self.log.info('获取资金出问题重试')
# try:
# self._get_cash_data(today_cash)
# except ValueError:
# raise RetryException
# sleep(2)
# if today_cash['微视']['now_cash'] > 2:
# self.u.click_('立即提现')
# sleep(3)
# self.u.back()
def _get_cash_data(self, today_cash):
desc_text = [i for i in self.u.class_attributes('android.view.View') if i]
today_cash['微视']['now_cash'] = float(desc_text[desc_text.index('账户余额(元)')+1])
today_cash['微视']['get_cash'] = 0
today_cash['微视']['all_cash'] = float(desc_text[desc_text.index('收入总额:')+1])
self.log.info(f'现金信息为:{today_cash["微视"]}')
return True
def watch_video(self, task_control):
self.log.info('看视频')
sleep(3)
# 好像得点击红包才能领,不点击不算数
i = 0
while True:
i+=1
switch_task(task_control, '微视', self.before)
self.u.swipe_page('down', 0.75)
sleep_num = randint(2, 4)
sleep(sleep_num)
if sleep_num == 10:
self.u.double_tap((0.53, 0.56))
if i > 10:
# 不能用xpath定位,如果用xpath与人工点不同。会跳到里面去
self.u.tap((0.772, 0.06), (720, 1600))
sleep(3)
if self.u.find('rps', 'id'):
# 微视的任务界面没有任何可以定位,除了这个
self.u.back()
elif not self.u.find('推荐', 'text'):
self.u.back()
else:
self.u.tap((0.498, 0.749))
i = 0
if self.u.find('不感兴趣 减少推送', 'text'):
self.u.back()
if self.u.find('排行', 'text'):
self.u.back()
if self.u.find('相关推荐', 'text'):
self.u.back()
def before(self):
# 结束工作前,点击红包按钮收红包
self.u.tap((0.772, 0.06), (720, 1600))
self.u.tap((0.505, 0.75))
class WeiShiMain:
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('微视') == '签到':
self._first()
raise TaskException
else:
self._content()
except RetryException:
self.app.u.close_app(self.app.package)
self.app.u.open_app(self.app.package)
sleep(5)
def _first(self):
self.app.log.info('开启')
self.app.dispose_pop_up_child()
self.app.sign_in()
self.app.get_cash_data(self.today_cash)
def _content(self):
self.app.watch_video(self.task_control)
class ZhongQing:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
self.u.creat_watcher('忽略', '忽略')
def affirm_started(self):
# 首页
if self.u.find('热点', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self):
self.log.info('打开中青')
self.u.click_('ob', 'id')
# 任务按钮
self.u.click_('r9', 'id')
self.u.click_('t3', 'id')
if self.u.find('火爆转发', timeout=10):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def get_cash_data(self, today_cash):
sleep(6)
now_cash = 0
result = self.u.class_attributes('android.widget.TextView')
for i in result:
if i:
re_cash = re.match(r'(\d{1,2}\.\d{1,2})$',i)
if re_cash:
self.log.info(f'现金为{re_cash.group()}')
now_cash = float(re_cash.group())
break
today_cash['中青']['now_cash'] = now_cash
today_cash['中青']['all_cash'] = now_cash
today_cash['中青']['get_cash'] = 0
def sign_in(self):
self.log.info('打开签到')
#签到领现金
self.u.click_('a74', 'id')
if self.u.click_('看视频', 'text', 5, contains=True):
self._dispose_ad()
# 退出广告的关闭按钮没有属性
self.log.info('已签到')
# 签到奖励
if self.u.click_('幸运奖励', 'text', 3):
self.u.click_('看视频', 'text', 5, contains=True)
self._dispose_ad()
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('tt_video_ad_close_layout','id'):
self.log.info('关闭广告')
return True
else:
self.log.info('广告出错,请检查')
return False
def watch_article(self, task_control):
# 首页文章
breakpoint()
self.log.info('完善')
self.u.click_('r7', 'id', 3)
self.log.info('看文章')
while True:
switch_task(task_control, '中青', self.do_cash)
self.u.swipe_page('down', 0.75)
if (self.u.click_('ua', 'id', 1) or self.u.click_('a_s', 'id', 1) or
self.u.click_('alt', 'id', 1) or self.u.click_('als', 'id', 1) or
self.u.click_('wb', 'id', 1) or self.u.click_('acf', 'id', 1) or
self.u.click_('a2i', 'id', 1)):
sleep(1)
# 视频会有广告
if (self.u.click_('ob', 'id') or
self.u.click_('qn', 'id')
or self.u.find('转发朋友圈', 'text', 1)):
self.log.info('刷到视频')
sleep(randint(10, 20))
self.u.back()
continue
else:
self.log.info('刷到文章')
# 网页广告会有关闭按钮
self.u.click_('pu', 'id')
if self.u.click_('评论'):
self.u.back()
self.u.back()
continue
if self.u.click_('点击领取') or self.u.find('待完成'):
self.u.back()
self.u.click_('lx', 'id')
self._swipe()
self.u.back()
sleep(randint(1, 3))
else:
self.u.click_('r7', 'id', 3)
def watch_video(self, task_control):
sleep(3)
self.log.info('完善')
if not self.u.click_('r_', 'id'):
self.u.click_('uc', 'id')
self.log.info('看视频')
if not self.u.click_('r_', 'id'):
self.u.click_('uc', 'id')
# mix
self.u.click_('视频')
# note3
self.u.click_('t4', 'id')
while True:
switch_task(task_control, '中青', self.do_cash)
self.u.swipe_page('down', 0.7)
if not self.u.click_('ah5', 'id'):
self.u.click_('aup', 'id')
# note3
# 视频
self.u.click_('akx', 'id')
# 关闭按钮
self.u.click_('pu', 'id')
self.u.click_('ob', 'id', 2)
sleep(randint(1,3))
if not self.u.find('gj', 'id'):
self.u.back()
continue
sleep(randint(10, 30))
def _swipe(self, base_num=2, down_num=20):
"""翻页子方法,因为中青的内容需要选择进入详情,才算金币"""
i = 0
while i < randint(base_num, down_num):
i += 1
sleep(randint(1, 2))
if self.u.find('评论', 'text', 1):
self.u.back()
self.u.click_('查看全文,奖励更多', 1)
self.u.swipe_page('down', uniform(0.6, 0.75))
self.u.click_('查看全文,奖励更多',1)
if self.u.find('阅读进度', 'text', 1):
self.u.back()
def do_cash(self):
for i in range (4):
# t3 note3
if self.u.click_('r9', 'id',) or self.u.click_('t3', 'id'):
break
else:
self.u.back()
else:
raise RetryException
self.u.click_('去提现', 'text')
self.u.click_('0.1元天天提')
self.u.click_('立即提现')
if self.u.find('提现成功'):
self.log.info('提现0.1元')
self.u.remove_watcher('忽略')
class ZhongQingMain:
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
self.app.u.click_('忽略')
if self.task_control.get('中青') == '签到':
self._first()
raise TaskException
elif self.task_control.get('中青') == '内容':
self._other()
else:
self._other()
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
def _first(self):
sleep(10)
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
self.app.sign_in()
def _other(self):
sleep(5)
if not randint(0,3):
self.app.watch_article(self.task_control)
else:
self.app.watch_video(self.task_control)
class Hui:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
if self.u.find('任务中心', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self):
self.u.click_('任务中心')
if self.u.find('走路赚', timeout=10):
self.log.info('进入任务界面')
else:
self.log.info('未进入任务界面,重启')
raise RetryException
def get_cash_data(self, today_cash):
self.u.click_('我的')
if self.u.find('兑换提现', timeout=10):
self.log.info('进入金币界面')
else:
self.log.info('未进入金币界面,重启')
raise RetryException
text = self.u.class_attributes('android.widget.TextView')
if text:
all_cash = round(float(text[text.index('历史总金币')+1].replace(',',''))/10000, 1)
else:
all_cash = 0
self.log.info('未能 获取总金额,请检查')
today_cash['惠头条']['all_cash'] = all_cash
today_cash['惠头条']['get_cash'] = 0
self.u.click_('兑换提现')
sleep(2)
result = self.u.find('tv_tips_gold_coin', 'id')
if result:
now_cash = round(float(result.info['text'][:-3].replace(',',''))/10000, 1)
else:
self.log.info('未能获取到现金,请检查')
now_cash = 0
today_cash['惠头条']['now_cash'] = now_cash
self.log.info(f'惠头条现金为{now_cash}')
if now_cash > 1.1:
self.do_cash()
self.u.back()
def do_cash(self):
self.log.info('提现')
self.u.click_('立即提现')
self.u.click_('继续兑换1.00元')
self.u.back()
def sign_in(self):
self.log.info('打开签到')
self.u.click_('img_close', 'id')
self.u.click_('sign_btn_container', 'id', 3)
sleep(5)
if self.u.click_('rl_count', 'id', 3): # 获得金币的提醒界面关闭按钮
self.u.back()
def watch_ad(self):
self.log.info('领奖励')
self.u.click_('img_close', 'id')
if self.u.click_('领奖励', 'text', 3, contains=True):
self._dispose_ad()
sleep(3)
self.u.click_('rl_count', 'id')
def watch_content_ad(self):
if not self.u.click_('头条'):
self.u.click_('刷新')
if self.u.click_('恭喜获得', 'text'):
self.log.info('首行广告')
if self.u.find('下载', 'text'):
self.u.back()
self._dispose_ad()
self.u.click_('iv_dialog_close', 'id')
if self.u.click_('点击领取'):
self.log.info('时段广告')
if self.u.click_('金币翻倍'):
self._dispose_ad()
self.u.click_('iv_dialog_close', 'id')
# 金币广告会自动下载放弃
def _dispose_ad(self):
self.log.info('等待广告')
sleep(randint(40,45))
for i in range(3):
self.u.click_('禁止')
if self.u.find('//*[@resource-id="com.xiaomi.market:id/top_bar_back_iv"]', 'xpath'):
self.u.back()
self.log.info(f'第{i}次尝试关闭广告')
if self.u.click_('tt_video_ad_close_layout', 'id'):
break
elif self.u.click_('tt_video_ad_close', 'id'):
break
elif self.u.click_('ciq','id', 2):
break
elif self.u.click_('ksad_end_close_btn', 'id'):
break
elif self.u.click_('day_of_week', 'id'):
self.u.click_('android.widget.ImageView', 'class')
break
elif self.u.click_('android.widget.ImageView', 'class'):
break
elif self.u.click_p(r'png/hui/back.jpg'):
break
else:
self.log.info('完善')
self.u.tap((0.096, 0.074), (720,1600))
if self.u.click_('rl_count', 'id'):
break
else:
self.log.info('广告出错,请检查')
return False
self.u.click_('放弃奖励')
sleep(3)
self.u.click_('rl_count', 'id', 3)
def watch_video(self, task_control):
self.log.info('看视频')
self.u.click_('img_close', 'id')
for i in range(3):
self.log.info(f'第{i}次尝试进入视频')
self.u.click_('视频')
if self.u.find('scr', 'id'):
break
self.log.info('进入视频页面')
while True:
switch_task(task_control, '惠头条')
self.u.swipe_page('down', 0.7)
if self.u.find('//*[@resource-id="com.xiaomi.market:id/detail_download_layout"]', 'xpath'):
self.u.back()
continue
if results := self.u.find('src', 'id'):
for result in results:
if '广告' not in result.info['text']:
result.click()
break
else:
self.log.info('未能定位的任何标题,重启')
raise RetryException()
self.u.click_('关闭广告')
sleep(3)
if not self.u.find('iv_hongbao', 'id'):
self.u.back()
self.log.info('可能进入广告页面')
continue
else:
self.log.info('进入页面开始观看')
sleep(randint(10, 120))
self.u.back()
def watch_content(self, task_control):
self.log.info('看内容')
if not self.u.click_('头条'):
self.u.click_('刷新')
self.u.click_('img_close', 'id')
self.u.swipe_page('down', 0.65)
while True:
switch_task(task_control, '惠头条')
self.u.swipe_page('down', 0.65)
if self.u.click_('tv_news_src', 'id'):
if not self.u.find('iv_hongbao', 'id'):
self.u.back()
self.log.info('可能进入广告页面')
continue
self._swipe(2, 5)
def _swipe(self, base_num=2, down_num=20):
if self.u.click_('关闭广告'):
self.log.info('刷到视频')
sleep(randint(10,20))
self.u.back()
else:
i = 0
while i < randint(base_num, down_num):
i += 1
sleep(randint(1, 2))
self.u.swipe_page('down', uniform(0.5, 0.7))
self.u.click_('展开全文', 'text', contains=True)
# 关闭翻倍卡
self.u.click_('iv_card_discard', 'id')
if self.u.click_('feeds_title', 'id'):
self.u.click_('残忍退出')
if not self.u.find('iv_hongbao', 'id'):
self.u.back()
self.u.click_('残忍退出')
self.log.info('可能进入广告页面')
return True
else:
self.u.back()
class HuiMain():
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('惠头条') == '签到':
self._first()
raise TaskException
elif self.task_control.get('惠头条') == '内容':
self._other()
elif self.task_control.get('惠头条') == '小时':
self._period()
raise TaskException
else:
self._other()
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
# @watcher_decorator(['惠_0', '惠_1'], [])
def _first(self):
self.app.into_tasks_page()
self.app.sign_in()
self.app.get_cash_data(self.today_cash)
def _other(self):
if randint(0, 1):
self.app.watch_video(self.task_control)
else:
self.app.watch_content(self.task_control)
def _period(self):
self.app.into_tasks_page()
self.app.watch_ad()
self.app.watch_content_ad()
class PinDuo:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.get_cash = 0
def test(self):
breakpoint()
def sign_in(self):
"""签到"""
# 签到领钱
breakpoint()
sleep(6)
self.u.click_('fi', 'id') # 大的弹窗
self.u.click_('webp') # 弹窗一个大红包的关闭按钮
self.u.click_('avt', 'id') # 朋友界面
self.u.click_('blv', 'id') # 好友动态
self.u.click_('f8', 'id') # 多多买菜弹窗
if not self.u.click_('签到'):
if self.u.find('限时秒杀', 'text'):
self.u.back()
if self.u.click_('立即签到'):
self.u.click_('明日再来')
self.u.click_('签到')
self._cash()
self.u.click_('签到一下')
if self.u.click_('签到领红包'):
if self.u.click_('继续逛街'):
self._content(45)
self._chest()
if self.u.click_('继续完成任务', 'text', 3) or self.u.click_('定时领红包'):
sleep(3)
if self.u.find('签到定时红包'):
self.u.click_('继续领取')
sleep(2)
if not self.u.click_('//android.webkit.WebView/android.view.View[2]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[3]', 'xpath'):
pass
if self.u.click_('逛街再开1次红包'):
self._content(25)
if self.u.click_('继续逛街赚签到金'):
self.u.back()
self._chest()
self.u.click_('签到')
self.u.click_('签到领红包')
sleep(3)
if (self.u.click_('继续完成任务') or self.u.click_('逛街得签到金')
or self.u.click_('继续逛街')):
self._content(40)
self._chest()
if self.u.find('去领更多签到金'):
self.u.tap( (0.888, 0.349), (1080, 2040))
self.u.click_('放弃翻倍,直接领取')
self._chest()
self.u.back()
self.u.click_('放弃并退出')
self.u.click_('直接离开')
self.u.click_('eh', 'id')
def do_cash(self):
self.u.click_('签到')
if self.u.click_('去提现'):
self.u.click_('提现')
def _content(self, seconds):
self._coupon()
self._chest()
if self.u.click_('立即收下'): # 翻倍卡
self.u.click_('去领签到金')
i = 0
while i < seconds:
num = randint(1, 3)
if self.u.find('立即下单使用'):
self.u.tap((0.891, 0.259), (1080, 2040))
self.u.swipe_page('down', uniform(0.55, 0.7))
i += num
def _chest(self):
if self.u.find('送你个宝箱,打开就能召唤宝贝'):
self.u.tap((0.457, 0.486) , (1080, 2040))
if not self.u.click_('//android.webkit.WebView/android.view.View[3]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]', 'xpath'):
self.u.tap((0.921, 0.21) , (720, 1600), calc=True)
self.u.back()
self.u.click_('不赚钱,离开')
def _coupon(self):
if self.u.find('恭喜你,获得优惠券奖励'):
if not self.u.click_('android.view.View', 'class'):
self.u.tap((0.888, 0.258), (1080, 2040))
def _cash(self):
if self.u.find('成功解锁微信提现特权'):
self.u.back()
if self.u.click_('去提现'):
self.u.click_('提现')
self.u.click_('我知道了,明天继续提现')
def do_ticket(self):
# 签到领无门槛券
self.u.click_('个人中心', 'text', 3)
self.u.click_('省钱月卡', 'text', 3)
if self.u.find('查看加速福利', 'text', 3):
breakpoint()
self.u.click_('无门槛券', 'text', 3)
self.u.click_('点击领更多活跃度', 'text', 3)
# 完成签到和
results = self.u.find('去完成', 'text')
if results:
for result in results:
result.click()
sleep(randint(1, 3))
self.u.back()
class PinDuoMain:
"""执行类,无继承"""
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
"""主函数循环,因为经常调试,所以分为正常和不正常两种。
看内容,更新文本需要传入tasks_data参数,其他不需要,单独的方法获取现金"""
# 正常启动
self.app.log.info('打开拼多多')
self.app.log.info('开机等待')
sleep(5)
self.app.test()
while True:
try:
print(self.task_control)
if self.task_control.get('拼多多') == '签到':
self.app.sign_in()
raise TaskException
else:
pass
except RetryException:
self.app.u.close_app(self.app.package)
sleep(5)
self.app.u.open_app(self.app.package)
class KuaiKan:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
if self.u.find('首页', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self, dispose_sign=True):
# 红包
self.u.click_('background', 'id')
if dispose_sign:
self.dispose_pop_up_sign_in()
self.u.click_('close', 'id')
if self.u.find('福利中心'):
self.log.info('进入任务界面')
else:
self.log.info('未进入任务界面,重启')
raise RetryException
def dispose_pop_up_sign_in(self):
if self.u.click_('看视频再领', contains=True):
self._dispose_ad()
def get_cash_data(self, today_cash):
self.log.info('获取资金')
if attr := self.u.class_attributes('android.view.View'):
try:
now_cash = float(attr[attr.index('金币余额')+1])/10000
today_cash['快看']['now_cash'] = now_cash
today_cash['快看']['get_cash'] = 0
today_cash['快看']['all_cash'] = float(attr[attr.index('累计获取金币')+1])/10000
self.log.info(f'快看现金为{now_cash}')
except ValueError:
self.log.info('没有获取到资金文本')
def watch_content(self, task_control):
self.log.info('看内容')
self.u.click_('资讯')
while True:
switch_task(task_control, '快看', self.do_cash)
self.u.swipe_page('down', uniform(0.4, 0.7))
self.u.click_('title', 'id')
# 红包和金蛋
if not (self.u.find('gold', 'id') or self.u.find('background', 'id')):
self.u.back()
continue
else:
self._swipe()
self.u.back()
def _swipe(self, base_num=2, down_num=20):
i = 0
while i < randint(base_num, down_num):
i += 1
if (self.u.find('没有更多评论了') or self.u.find('暂无评论,点击抢沙发')
or self.u.find('重播')):
break
sleep(randint(1, 2))
self.u.swipe_page('down', uniform(0.4, 0.7))
def do_cash(self):
for i in range(4):
if not self.u.find('福利'):
self.u.back()
else:
break
self.u.click_('福利')
self.u.click_('总金币')
self.u.click_('去分享领取')
if self.u.click_('立即分享领取', timeout=4):
self.u.click_('分享链接')
self.u.remove_watcher('no_4')
self.u.click_('网赚分享群', timeout=10)
self.u.click_('分享')
self.u.click_('返回快看点')
self.u.click_('提取', timeout=5)
self.u.back()
self.u.creat_watcher('no_4', '取消')
def get_gold(self):
self.log.info('领金币')
if not self.u.find('已达上限'):
self.u.click_('reward_text', 'id')
if self.u.click_('翻倍领取'):
self._dispose_ad()
self.u.click_('close', 'id', timeout=4)
else:
self.log.info('今日领完,已达上限')
if self.u.find('看视频再领', contains=True):
if self.u.click_('看视频'):
self._dispose_ad()
self.u.click_('close', 'id', timeout=4)
def show_off(self):
self.u.click_('晒晒收入')
self.u.click_('微信')
self.u.click_('勇往直前的疯、大萝卜、阿飞')
self.u.click_('分享')
self.u.click_('返回快看点')
def do_dial(self):
breakpoint()
self.log.info('玩转盘')
self.u.click_('超级大转盘')
if not self.u.find('今日剩余 : 0次', timeout=4):
if self.u.click_('//*[@resource-id="container"]/android.view.View[6]/android.view.View[3]', 'xpath'):
if self.u.find('继续抽奖', timeout=5):
double_xy = {(720, 1600):(0.5, 0.6)}
self.u.tap(double_xy.get(self.u.xy, (0.5, 0.6)))
self._dispose_ad()
self.u.back()
def do_bank(self):
breakpoint()
look_xy = {(720, 1600):(0.837, 0.931),
(1080, 1920): (0.848, 0.921)}
self.u.click_('bg', 'id')
sleep(3)
if pos := look_xy.get(self.u.xy):
self.u.tap(pos)
self.u.click_('支付0金币')
self.u.back()
def watch_ad(self):
self.log.info('看广告')
self.u.swipe_page('down', 0.6)
self.u.swipe_page('down', 0.6)
if not self.u.find('广告任务50金币/次,今日 10/10次'):
if self.u.click_('金币悬赏任务'):
self._dispose_ad()
else:
self.log.info('广告已看完')
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('video_close_icon', 'id'):
self.log.info('关闭广告')
return True
else:
self.log.info('广告出错,请检查')
return False
class KuaiKanMain:
"""执行类,无继承"""
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
self.timed_task = timed_task
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('快看') == '签到':
self._first()
raise TaskException
elif self.task_control.get('快看') == '小时':
self._period()
self.task_control['快看'] = '内容'
raise TaskException
elif self.task_control.get('快看') == '内容':
self.app.watch_content(self.task_control)
else:
self.app.watch_content(self.task_control)
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
self.app.u.open_app(self.app.package)
sleep(5)
def _first(self):
self.app.into_tasks_page()
self.app.dispose_pop_up_sign_in()
self.app.get_gold()
self.app.do_dial()
self.app.watch_ad()
def _period(self):
self.app.into_tasks_page(False)
self.app.get_gold()
self.app.do_dial()
self.app.watch_ad()
self.app.do_bank()
class DouHuo:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def affirm_started(self):
# 首页
if self.u.find('d23', 'id', timeout=40):
self.log.info('成功开启')
else:
self.log.info('未进入程序,重启')
raise RetryException
def into_tasks_page(self, dispose_sign=True):
# 金币按钮
self.u.click_('b77', 'id')
self.u.click_('eyb', 'id')
if dispose_sign:
self.dispose_pop_up_sign_in()
if self.u.find('火苗管理', timeout=20):
self.log.info('进入任务界面')
else:
self.log.info('未能进入任务界面,重启')
raise RetryException
def dispose_pop_up_child(self):
self.log.info('处理青少年弹窗')
def dispose_pop_up_sign_in(self):
self.log.info('处理签到弹窗')
if not self.u.click_('//*[@resource-id="com.ss.android.ugc.live:id/gdm"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[3]/android.view.View[2]/android.view.View[1]/android.view.View[3]', 'xpath'):
self.u.tap((0.5,0.78), (1080, 1920))
self._dispose_ad()
def get_cash_data(self, today_cash):
self.log.info('获取资金')
if attr := self.u.attribute('约', contains=True):
today_cash['抖火']['now_cash'] = float(attr[1:-1])
today_cash['抖火']['get_cash'] = 0
today_cash['抖火']['all_cash'] = float(attr[1:-1])
self.log.info(f'抖火现金为{attr}')
else:
self.log.info('没有获取到现金')
def watch_ad(self):
self.log.info('看广告')
if self.u.click_('去领取'):
self._dispose_ad()
else:
self.log.info('广告未刷新')
def do_lucky_draw(self):
self.log.info('抽奖')
self.u.click_('去抽奖')
if text := self.u.attribute('今日剩余','text', contains=True):
times = int(text[-2])
if times != 0:
for i in range(times):
self.u.click_('lottery_has_chance')
if self.u.click_('看视频', contains=True):
self._dispose_ad()
elif self.u.click_('领取奖励'):
pass
else:
self.u.click_('卖出', contains=True)
else:
self.u.back()
def _dispose_ad(self):
self.log.info('等待广告')
if self.u.long_wait_click('关闭广告'):
self.log.info('成功关闭广告')
else:
self.log.info('关闭广告遇到问题,请检查')
def watch_content(self, task_control):
self.log.info('刷内容')
if self.u.find('biq', 'id', 5) or self.u.find('bod', 'id'):
self.log.info('进入页面')
else:
self.log.info('没进入页面重启')
raise RetryException
self.u.click_('biq', 'id')
while True:
switch_task(task_control, '抖火')
self.u.click_('biq', 'id')
self.u.swipe_page()
if not self.u.find('bod', 'id'):
self.u.back()
continue
if not self.u.find('返回'):
self.u.back()
continue
if self.u.find('点击进入直播间'):
continue
num = randint(1, 30)
sleep(randint(1, 10))
if num == 3:
# 点赞
self.u.click_('ctf', 'id')
class DuoHuoMain:
"""执行类,无继承"""
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
self.timed_task = timed_task
def main(self):
while True:
try:
self.app.log.info('开机等待')
self.app.affirm_started()
if self.task_control.get('抖火') == '签到':
self._first()
raise TaskException
elif self.task_control.get('抖火') == '周期':
self._period()
self.task_control['抖火'] = '内容'
raise TaskException
elif self.task_control.get('抖火') == '内容':
self._content()
else:
self._content()
except RetryException:
self.app.log.info('遇到严重错误,重来')
self.app.u.close_app(self.app.package)
self.app.u.open_app(self.app.package)
sleep(5)
def _first(self):
sleep(10)
self.app.dispose_pop_up_sign_in()
self.app.into_tasks_page()
self.app.get_cash_data(self.today_cash)
self.app.watch_ad()
def _period(self):
self.app.into_tasks_page(False)
self.app.watch_ad()
def _content(self):
self.app.watch_content(self.task_control)
class LeiBao:
def __init__(self, u, log, package):
self.u = u
self.log = log
self.u.open_app(package)
self.package = package
def task(self):
self.u.click_('垃圾清理', timeout=10)
self.u.long_wait_click('清理垃圾', contains=True, timeout=40)
self.u.find('已清理垃圾', contains=True, timeout=15)
return True
class LeiBaoMain:
"""执行类,无继承"""
def __init__(self, app, task_control={}, today_cash={}, timed_task={}):
self.app = app
self.task_control = task_control
self.today_cash = today_cash
def main(self):
self.app.log.info('打开猎豹')
self.app.log.info('开机等待')
sleep(10)
self.app.task()
APPS = {'快手': (KuaiShou, KuaiShouMain, 'com.kuaishou.nebula'),
'头条': (TouTiao, TouTiaoMain, 'com.ss.android.article.lite'),
'番茄': (FanQie, FanQieMain, 'com.dragon.read'),
'火山': (HuoShan, HuoShanMain, 'com.ss.android.ugc.livelite'),
'中青': (ZhongQing, ZhongQingMain, 'cn.youth.news'),
'淘宝': (TaoBao, TaoBaoMain, 'com.taobao.live'),
'惠头条': (Hui, HuiMain, 'com.cashtoutiao'),
'微视': (WeiShi, WeiShiMain, 'com.tencent.weishi'),
'拼多多':(PinDuo, PinDuoMain, 'com.xunmeng.pinduoduo'),
'猎豹': (LeiBao, LeiBaoMain, 'com.cleanmaster.lite_cn'),
'快看': (KuaiKan, KuaiKanMain, 'com.yuncheapp.android.pearl'),
'抖火': (DouHuo, DuoHuoMain, 'com.ss.android.ugc.live'),
'京东': (JingDong, JingDongMain, 'com.jd.jdlite')
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/ieping/collect-wool.git
git@gitee.com:ieping/collect-wool.git
ieping
collect-wool
薅羊毛
master

搜索帮助