代码拉取完成,页面将自动刷新
import json
import os
import platform
import time
from pathlib import Path
import folder_paths
import nodes
class Loader:
def __init__(self):
pass
__ROOT_PATH = os.path.dirname(os.path.abspath(__file__))
__TEMPLATE_PATH = os.path.join(__ROOT_PATH, "resources/template.json")
__TIMESTAMP_PATH = os.path.join(__ROOT_PATH, "resources/timestamp.json")
__CONFIG_PATH = os.path.join(__ROOT_PATH, "config.json")
__GIT_PATH = Path(os.path.join(__ROOT_PATH, ".git"))
__DAY_SECONDS = 24 * 60 * 60
__WEEK_SECONDS = 7 * __DAY_SECONDS
__MONTH_SECONDS = 30 * __DAY_SECONDS
def __log(self, text):
print("\033[92m[Allor]\033[0m: " + text)
def __error(self, text):
print("\033[91m[Allor]\033[0m: " + text)
def __notification(self, text):
print("\033[94m[Allor]\033[0m: " + text)
def __new_line(self):
print()
def __create_config(self):
with open(self.__CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(self.__template(), f, ensure_ascii=False, indent=4)
def __create_timestamp(self):
with open(self.__TIMESTAMP_PATH, "w", encoding="utf-8") as f:
json.dump({"timestamp": 0}, f, ensure_ascii=False, indent=4)
def __get_template(self):
with open(self.__TEMPLATE_PATH, "r") as f:
template = json.load(f)
if "__comment" in template:
del template["__comment"]
return template
def __get_config(self):
with open(self.__CONFIG_PATH, "r") as f:
return json.load(f)
def __get_timestamp(self):
with open(self.__TIMESTAMP_PATH, "r") as f:
return json.load(f)
def __update_config(self, template, source):
def update_source(__template, __source):
for k, v in __template.items():
if k not in __source:
if isinstance(v, dict):
__source[k] = {}
else:
__source[k] = v
if isinstance(v, dict):
__source[k] = update_source(v, __source[k])
return __source
def delete_keys(__template, __source):
keys_to_delete = [k for k in __source if k not in __template]
for k in keys_to_delete:
del __source[k]
return __source
def sync_order(__template, __source):
new_source = {}
for key in __template:
if key in __source:
if isinstance(__template[key], dict):
new_source[key] = sync_order(__template[key], __source[key])
else:
new_source[key] = __source[key]
return new_source
source = update_source(template, source)
source = delete_keys(template, source)
source = sync_order(template, source)
with open(self.__CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(source, f, ensure_ascii=False, indent=4)
def __update_timestamp(self):
with open(self.__TIMESTAMP_PATH, "w", encoding="utf-8") as f:
json.dump({"timestamp": time.time()}, f, ensure_ascii=False, indent=4)
__template = __get_template
__config = __get_config
__timestamp = __get_timestamp
def __get_fonts_folder_path(self):
system = platform.system()
user_home = os.path.expanduser('~')
config_font_path = os.path.join(folder_paths.base_path, *self.__config()["fonts"]["folder_path"].replace("\\", "/").split("/"))
if not os.path.exists(config_font_path):
os.makedirs(config_font_path, exist_ok=True)
paths = [config_font_path]
if self.__config()["fonts"]["system_fonts"]:
if system == "Windows":
paths.append(os.path.join(os.environ["WINDIR"], "Fonts"))
elif system == "Darwin":
paths.append(os.path.join("/Library", "Fonts"))
elif system == "Linux":
paths.append(os.path.join("/usr", "share", "fonts"))
paths.append(os.path.join("/usr", "local", "share", "fonts"))
if self.__config()["fonts"]["user_fonts"]:
if system == "Darwin":
paths.append(os.path.join(user_home, "Library", "Fonts"))
elif system == "Linux":
paths.append(os.path.join(user_home, ".fonts"))
return [path for path in paths if os.path.exists(path)]
def __get_keys(self, json_obj, prefix=''):
keys = []
for k, v in json_obj.items():
if isinstance(v, dict):
keys.extend(self.__get_keys(v, prefix + k + '.'))
else:
keys.append(prefix + k)
return set(keys)
def __check_json_keys(self, json1, json2):
keys1 = self.__get_keys(json1)
keys2 = self.__get_keys(json2)
return keys1 == keys2
def setup_config(self):
if not os.path.exists(self.__CONFIG_PATH):
self.__log("Creating config.json")
self.__create_config()
else:
if not self.__check_json_keys(self.__template(), self.__config()):
self.__log("Updating config.json")
self.__update_config(self.__template(), self.__config())
def setup_timestamp(self):
if not os.path.exists(self.__TIMESTAMP_PATH):
self.__log("Creating timestamp.json")
self.__create_timestamp()
def check_updates(self):
branch_name = self.__config()["updates"]["branch_name"]
update_frequency = self.__config()["updates"]["update_frequency"].lower()
valid_frequencies = ["always", "day", "week", "month", "never"]
time_difference = time.time() - self.__timestamp()["timestamp"]
if update_frequency == valid_frequencies[0]:
it_is_time_for_update = True
elif update_frequency == valid_frequencies[1]:
it_is_time_for_update = time_difference >= self.__DAY_SECONDS
elif update_frequency == valid_frequencies[2]:
it_is_time_for_update = time_difference >= self.__WEEK_SECONDS
elif update_frequency == valid_frequencies[3]:
it_is_time_for_update = time_difference >= self.__MONTH_SECONDS
elif update_frequency == valid_frequencies[4]:
it_is_time_for_update = False
else:
self.__error(f"Unknown update frequency - {update_frequency}, available: {valid_frequencies}")
return
if it_is_time_for_update:
if not (self.__GIT_PATH.exists() or self.__GIT_PATH.is_dir()):
self.__error("Root directory of Allor is not a git repository. Update canceled.")
return
try:
import git
from git import Repo
from git import GitCommandError
# noinspection PyTypeChecker, PyUnboundLocalVariable
repo = Repo(self.__ROOT_PATH, odbt=git.db.GitDB)
current_commit = repo.head.commit.hexsha
repo.remotes.origin.fetch()
latest_commit = getattr(repo.remotes.origin.refs, branch_name).commit.hexsha
if current_commit == latest_commit:
if self.__config()["updates"]["notify_if_no_new_updates"]:
self.__notification("No new updates.")
else:
if self.__config()["updates"]["notify_if_has_new_updates"]:
self.__notification("New updates are available.")
if self.__config()["updates"]["auto_update"]:
update_mode = self.__config()["updates"]["update_mode"].lower()
valid_modes = ["soft", "hard"]
if repo.active_branch.name != branch_name:
try:
repo.git.checkout(branch_name)
except GitCommandError:
self.__error(f"An error occurred while switching to the branch {branch_name}.")
return
if update_mode == "soft":
try:
repo.git.pull()
except GitCommandError:
self.__error("An error occurred during the update. "
"It is recommended to use \"hard\" update mode. "
"But be careful, it erases all personal changes from Allor repository.")
elif update_mode == "hard":
repo.git.reset('--hard', 'origin/' + branch_name)
else:
self.__error(f"Unknown update mode - {update_mode}, available: {valid_modes}")
return
self.__notification("Update complete.")
self.__update_timestamp()
except ImportError:
self.__error("GitPython is not installed.")
def setup_rembg(self):
os.environ["U2NET_HOME"] = folder_paths.models_dir + "/onnx"
def setup_paths(self):
fonts_folder_path = self.__get_fonts_folder_path()
folder_paths.folder_names_and_paths["onnx"] = ([os.path.join(folder_paths.models_dir, "onnx")], {".onnx"})
folder_paths.folder_names_and_paths["fonts"] = (fonts_folder_path, {".otf", ".ttf"})
def setup_override(self):
override_nodes_len = 0
def override(function):
start_len = nodes.NODE_CLASS_MAPPINGS.__len__()
nodes.NODE_CLASS_MAPPINGS = dict(
filter(function, nodes.NODE_CLASS_MAPPINGS.items())
)
return start_len - nodes.NODE_CLASS_MAPPINGS.__len__()
if self.__config()["override"]["postprocessing"]:
override_nodes_len += override(lambda item: not item[1].CATEGORY.startswith("image/postprocessing"))
if self.__config()["override"]["transform"]:
override_nodes_len += override(lambda item: not item[0] == "ImageScale" and not item[0] == "ImageScaleBy" and not item[0] == "ImageInvert")
if self.__config()["override"]["debug"]:
nodes.VAEDecodeTiled.CATEGORY = "latent"
nodes.VAEEncodeTiled.CATEGORY = "latent"
override_nodes_len += override(lambda item: not item[1].CATEGORY.startswith("_for_testing"))
self.__log(str(override_nodes_len) + " nodes were overridden.")
def get_modules(self):
modules = dict()
if self.__config()["modules"]["AlphaChanel"]:
from .modules import AlphaChanel
modules.update(AlphaChanel.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["Clamp"]:
from .modules import Clamp
modules.update(Clamp.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageBatch"]:
from .modules import ImageBatch
modules.update(ImageBatch.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageComposite"]:
from .modules import ImageComposite
modules.update(ImageComposite.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageContainer"]:
from .modules import ImageContainer
modules.update(ImageContainer.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageDraw"]:
from .modules import ImageDraw
modules.update(ImageDraw.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageEffects"]:
from .modules import ImageEffects
modules.update(ImageEffects.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageFilter"]:
from .modules import ImageFilter
modules.update(ImageFilter.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageNoise"]:
from .modules import ImageNoise
modules.update(ImageNoise.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageSegmentation"]:
from .modules import ImageSegmentation
modules.update(ImageSegmentation.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageText"]:
from .modules import ImageText
modules.update(ImageText.NODE_CLASS_MAPPINGS)
if self.__config()["modules"]["ImageTransform"]:
from .modules import ImageTransform
modules.update(ImageTransform.NODE_CLASS_MAPPINGS)
modules_len = dict(
filter(
lambda item: item[1],
self.__config()["modules"].items()
)
).__len__()
nodes_len = modules.__len__()
self.__log(str(modules_len) + " modules were enabled.")
self.__log(str(nodes_len) + " nodes were loaded.")
return modules
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。