1 Star 2 Fork 2

loa123/epicgames-claimer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
main.py 3.92 KB
一键复制 编辑 原始数据 按行查看 历史
luminoleon 提交于 2022-01-08 10:20 . Bugfix
import importlib
import json
import os
import re
import shutil
import signal
import sys
import time
from typing import List, Tuple
import requests
import urllib3
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
CLAIMER_FILE_NAME = "epicgames_claimer.py"
CLAIMER_FILE_BAK_NAME = "epicgames_claimer.py.bak"
API_TAGS = "https://api.github.com/repos/luminoleon/epicgames-claimer/tags"
API_LATEST = "https://api.github.com/repos/luminoleon/epicgames-claimer/releases/latest"
MESSAGE_START = "Claimer started"
MESSAGE_END = "Claim completed"
MESSAGE_UPDATING = "Found a new version. Updating ..."
MESSAGE_UPDATE_FAILED = "Update failed: "
MESSAGE_UPDATE_COMPLETED = "Update completed"
MESSAGE_RUN_FAILED = f"Failed to run {CLAIMER_FILE_NAME}: "
try:
import epicgames_claimer
except:
shutil.copy(CLAIMER_FILE_BAK_NAME, CLAIMER_FILE_NAME)
import epicgames_claimer
args = epicgames_claimer.get_args(run_by_main_script=True)
args.external_schedule = True
def get_current_version() -> List[str]:
result = os.popen(f"{sys.executable} {CLAIMER_FILE_NAME} --version").read()
version_string = re.findall(r"\d+\.(?:\d+\.)*\d+", result)[0]
version = version_string.split(".")
return version
def get_latest_version() -> Tuple[List[str], str]:
response = json.loads(requests.get(API_LATEST, verify=False).text)
tag_name = response["tag_name"]
version_string = re.findall(r"\d+\.(?:\d+\.)*\d+", tag_name)[0]
version = version_string.split(".")
url = ""
for item in response["assets"]:
if item["name"] == CLAIMER_FILE_NAME:
url = item["browser_download_url"]
return version, url
def need_update() -> Tuple[bool, str]:
current_ver = get_current_version()
latest_ver, latest_download_url = get_latest_version()
if not os.path.exists(CLAIMER_FILE_NAME):
return True
if latest_ver[0] != current_ver[0]:
return False, latest_download_url
found_a_new_version = False
if int(latest_ver[1]) > int(current_ver[1]):
found_a_new_version = True
elif int(latest_ver[1]) == int(current_ver[1]) and int(latest_ver[2]) > int(current_ver[2]):
found_a_new_version = True
return found_a_new_version, latest_download_url
def update() -> None:
try:
found_a_new_version, latest_download_url = need_update()
if found_a_new_version:
shutil.copy(CLAIMER_FILE_NAME, CLAIMER_FILE_BAK_NAME)
response = requests.get(latest_download_url, verify=False)
epicgames_claimer.log(MESSAGE_UPDATING)
with open(CLAIMER_FILE_NAME,"wb") as f:
f.write(response.content)
importlib.reload(epicgames_claimer)
epicgames_claimer.log(MESSAGE_UPDATE_COMPLETED)
except Exception as e:
epicgames_claimer.log(f"{MESSAGE_UPDATE_FAILED}{e}", level="warning")
def run_once() -> None:
try:
if args.auto_update:
update()
original_sigint_handler = signal.getsignal(signal.SIGINT)
epicgames_claimer.main(args)
signal.signal(signal.SIGINT, original_sigint_handler)
args.no_startup_notification = True
except Exception as e:
epicgames_claimer.log(f"{MESSAGE_RUN_FAILED}{e}", "error")
def run_forever():
cron_expression = args.cron
epicgames_claimer.log(f"Start schedule use crontab: {cron_expression}")
cron_schedule = BackgroundScheduler()
cron_schedule.add_job(run_once, CronTrigger.from_crontab(cron_expression))
run_once()
try:
cron_schedule.start()
while True:
time.sleep(3600)
except KeyboardInterrupt:
cron_schedule.shutdown(wait=False)
pass
def main() -> None:
epicgames_claimer.log(MESSAGE_START)
run_once() if args.once else run_forever()
epicgames_claimer.log(MESSAGE_END)
if __name__ == "__main__":
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/loa123/epicgames-claimer.git
git@gitee.com:loa123/epicgames-claimer.git
loa123
epicgames-claimer
epicgames-claimer
main

搜索帮助