代码拉取完成,页面将自动刷新
import sys
import signal
import threading
import asyncio
import aiohttp
import conf_loader
import notifier
import bili_sched
import printer
import bili_statistics
from console_cmd import ConsoleCmd
from tasks.login import LoginTask
from tasks.live_daily_job import (
HeartBeatTask,
OpenSilverBoxTask,
RecvDailyBagTask,
SignTask,
WatchTvTask,
SignFansGroupsTask,
SendGiftTask,
ExchangeSilverCoinTask
)
from tasks.main_daily_job import (
JudgeCaseTask,
BiliMainTask,
DahuiyuanTask
)
from tasks.manga_daily_job import (
ShareComicTask,
MangaSignTask,
)
from tasks.utils import UtilsTask
# 弹幕
from danmu.bili_danmu_monitor import DanmuPrinter, DanmuRaffleMonitor
from danmu.yj_monitor import TcpYjMonitorClient
from danmu import raffle_handler
# 实物抽奖
from substance.monitor_substance_raffle import SubstanceRaffleMonitor
from dyn.monitor_dyn_raffle import DynRaffleMonitor
loop = asyncio.get_event_loop()
dict_user = conf_loader.read_user()
dict_bili = conf_loader.read_bili()
dict_color = conf_loader.read_color()
dict_ctrl = conf_loader.read_ctrl()
dict_task = conf_loader.read_task()
printer.init_config(dict_color, dict_ctrl['print_control']['danmu'])
############################################################################
############################################################################
# 👇users 录入程序
async def init_users():
global_task_control = dict_task['global_task_control']
custom_task_control = dict_task['custom_task_control']
global_task_arrangement = dict_task['global_task_arrangement']
custom_task_arrangement = dict_task['custom_task_arrangement']
users = notifier.Users(global_task_control=global_task_control,
global_task_arrangement=global_task_arrangement,
dict_bili=dict_bili,
force_sleep=bili_sched.force_sleep)
notifier.init(users=users)
assert dict_user['users'] # 这个 list 为 true 表示至少要有一个用户信息
for user_info in dict_user['users']:
username = user_info['username']
await notifier.add_user(user_info=user_info,
custom_task_control=custom_task_control.get(username, {}),
custom_task_arrangement=custom_task_arrangement.get(username, {}))
loop.run_until_complete(init_users())
############################################################################
############################################################################
# 👇重复任务录入程序
# 时间间隔为小时,同时每次休眠结束都会计时归零,重新从当前时间计算时间间隔
# 下面表示每隔多少小时执行一次
def add_daily_jobs():
bili_sched.add_daily_jobs(HeartBeatTask, every_hours=6)
bili_sched.add_daily_jobs(OpenSilverBoxTask, every_hours=6)
bili_sched.add_daily_jobs(RecvDailyBagTask, every_hours=3)
bili_sched.add_daily_jobs(SignTask, every_hours=6)
bili_sched.add_daily_jobs(WatchTvTask, every_hours=6)
bili_sched.add_daily_jobs(SignFansGroupsTask, every_hours=6)
bili_sched.add_daily_jobs(SendGiftTask, every_hours=2)
bili_sched.add_daily_jobs(ExchangeSilverCoinTask, every_hours=6)
bili_sched.add_daily_jobs(JudgeCaseTask, every_hours=0.75)
bili_sched.add_daily_jobs(BiliMainTask, every_hours=4)
bili_sched.add_daily_jobs(MangaSignTask, every_hours=6)
bili_sched.add_daily_jobs(ShareComicTask, every_hours=6)
bili_sched.add_daily_jobs(DahuiyuanTask, every_hours=6)
add_daily_jobs()
############################################################################
############################################################################
loop.run_until_complete(notifier.exec_task(LoginTask))
other_control = dict_ctrl['other_control']
area_ids = loop.run_until_complete(notifier.exec_func(UtilsTask.fetch_blive_areas))
area_duplicated = other_control['area_duplicated']
if area_duplicated:
area_ids *= 2
bili_statistics.init(area_num=len(area_ids), area_duplicated=area_duplicated)
default_roomid = other_control['default_monitor_roomid']
############################################################################
############################################################################
# 👇录入 monitors
# aiohttp sb session
async def init_monitors():
session = aiohttp.ClientSession()
monitors_ = []
# 弹幕打印功能
danmu_printer_ = DanmuPrinter(
room_id=default_roomid,
area_id=-1,
session=session)
# 弹幕抽奖监控
for area_id in area_ids:
monitor = DanmuRaffleMonitor(
room_id=0,
area_id=area_id,
session=session)
monitors_.append(monitor)
# yjmonitor 弹幕监控
yjmonitor_tcp_addr = other_control['yjmonitor_tcp_addr']
yjmonitor_tcp_key = other_control['yjmonitor_tcp_key']
if yjmonitor_tcp_key:
monitor = TcpYjMonitorClient(
key=yjmonitor_tcp_key,
url=yjmonitor_tcp_addr,
area_id=0)
monitors_.append(monitor)
if other_control['substance_raffle']:
monitors_.append(SubstanceRaffleMonitor())
if other_control['dyn_raffle']:
monitors_.append(DynRaffleMonitor(
should_join_immediately=other_control['join_dyn_raffle_at_once']))
return danmu_printer_, monitors_
danmu_printer, monitors = loop.run_until_complete(init_monitors())
############################################################################
############################################################################
bili_sched.init(monitors=monitors, sleep_ranges=dict_ctrl['other_control']['sleep_ranges'])
# 初始化控制台
if sys.platform != 'linux' or signal.getsignal(signal.SIGHUP) == signal.SIG_DFL:
console_thread = threading.Thread(
target=ConsoleCmd(loop, default_roomid, danmu_printer).cmdloop)
console_thread.start()
else:
console_thread = None
tasks = [monitor.run() for monitor in monitors]
other_tasks = [
bili_sched.run(),
raffle_handler.run(),
danmu_printer.run()
]
if other_tasks:
loop.run_until_complete(asyncio.wait(tasks + other_tasks))
loop.run_forever()
if console_thread is not None:
console_thread.join()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。