代码拉取完成,页面将自动刷新
同步操作将从 CunKai/ComfyUI-Manager 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import subprocess
import sys
import os
import traceback
import git
import configparser
import re
import json
import yaml
import requests
from tqdm.auto import tqdm
from git.remote import RemoteProgress
def download_url(url, dest_folder, filename=None):
# Ensure the destination folder exists
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
# Extract filename from URL if not provided
if filename is None:
filename = os.path.basename(url)
# Full path to save the file
dest_path = os.path.join(dest_folder, filename)
# Download the file
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(dest_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
else:
print(f"Failed to download file from {url}")
config_path = os.path.join(os.path.dirname(__file__), "config.ini")
nodelist_path = os.path.join(os.path.dirname(__file__), "custom-node-list.json")
working_directory = os.getcwd()
if os.path.basename(working_directory) != 'custom_nodes':
print(f"WARN: This script should be executed in custom_nodes dir")
print(f"DBG: INFO {working_directory}")
print(f"DBG: INFO {sys.argv}")
# exit(-1)
class GitProgress(RemoteProgress):
def __init__(self):
super().__init__()
self.pbar = tqdm(ascii=True)
def update(self, op_code, cur_count, max_count=None, message=''):
self.pbar.total = max_count
self.pbar.n = cur_count
self.pbar.pos = 0
self.pbar.refresh()
def gitclone(custom_nodes_path, url, target_hash=None):
repo_name = os.path.splitext(os.path.basename(url))[0]
repo_path = os.path.join(custom_nodes_path, repo_name)
# Clone the repository from the remote URL
repo = git.Repo.clone_from(url, repo_path, recursive=True, progress=GitProgress())
if target_hash is not None:
print(f"CHECKOUT: {repo_name} [{target_hash}]")
repo.git.checkout(target_hash)
repo.git.clear_cache()
repo.close()
def gitcheck(path, do_fetch=False):
try:
# Fetch the latest commits from the remote repository
repo = git.Repo(path)
if repo.head.is_detached:
print("CUSTOM NODE CHECK: True")
return
current_branch = repo.active_branch
branch_name = current_branch.name
remote_name = current_branch.tracking_branch().remote_name
remote = repo.remote(name=remote_name)
if do_fetch:
remote.fetch()
# Get the current commit hash and the commit hash of the remote branch
commit_hash = repo.head.commit.hexsha
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha
# Compare the commit hashes to determine if the local repository is behind the remote repository
if commit_hash != remote_commit_hash:
# Get the commit dates
commit_date = repo.head.commit.committed_datetime
remote_commit_date = repo.refs[f'{remote_name}/{branch_name}'].object.committed_datetime
# Compare the commit dates to determine if the local repository is behind the remote repository
if commit_date < remote_commit_date:
print("CUSTOM NODE CHECK: True")
else:
print("CUSTOM NODE CHECK: False")
except Exception as e:
print(e)
print("CUSTOM NODE CHECK: Error")
def switch_to_default_branch(repo):
show_result = repo.git.remote("show", "origin")
matches = re.search(r"\s*HEAD branch:\s*(.*)", show_result)
if matches:
default_branch = matches.group(1)
repo.git.checkout(default_branch)
def gitpull(path):
# Check if the path is a git repository
if not os.path.exists(os.path.join(path, '.git')):
raise ValueError('Not a git repository')
# Pull the latest changes from the remote repository
repo = git.Repo(path)
if repo.is_dirty():
repo.git.stash()
commit_hash = repo.head.commit.hexsha
try:
if repo.head.is_detached:
switch_to_default_branch(repo)
current_branch = repo.active_branch
branch_name = current_branch.name
remote_name = current_branch.tracking_branch().remote_name
remote = repo.remote(name=remote_name)
remote.fetch()
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha
if commit_hash == remote_commit_hash:
print("CUSTOM NODE PULL: None") # there is no update
repo.close()
return
remote.pull()
repo.git.submodule('update', '--init', '--recursive')
new_commit_hash = repo.head.commit.hexsha
if commit_hash != new_commit_hash:
print("CUSTOM NODE PULL: Success") # update success
else:
print("CUSTOM NODE PULL: Fail") # update fail
except Exception as e:
print(e)
print("CUSTOM NODE PULL: Fail") # unknown git error
repo.close()
def checkout_comfyui_hash(target_hash):
repo_path = os.path.abspath(os.path.join(working_directory, '..')) # ComfyUI dir
repo = git.Repo(repo_path)
commit_hash = repo.head.commit.hexsha
if commit_hash != target_hash:
try:
print(f"CHECKOUT: ComfyUI [{target_hash}]")
repo.git.checkout(target_hash)
except git.GitCommandError as e:
print(f"Error checking out the ComfyUI: {str(e)}")
def checkout_custom_node_hash(git_custom_node_infos):
repo_name_to_url = {}
for url in git_custom_node_infos.keys():
repo_name = url.split('/')[-1]
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
repo_name_to_url[repo_name] = url
for path in os.listdir(working_directory):
if path.endswith("ComfyUI-Manager"):
continue
fullpath = os.path.join(working_directory, path)
if os.path.isdir(fullpath):
is_disabled = path.endswith(".disabled")
try:
git_dir = os.path.join(fullpath, '.git')
if not os.path.exists(git_dir):
continue
need_checkout = False
repo_name = os.path.basename(fullpath)
if repo_name.endswith('.disabled'):
repo_name = repo_name[:-9]
if repo_name not in repo_name_to_url:
if not is_disabled:
# should be disabled
print(f"DISABLE: {repo_name}")
new_path = fullpath + ".disabled"
os.rename(fullpath, new_path)
need_checkout = False
else:
item = git_custom_node_infos[repo_name_to_url[repo_name]]
if item['disabled'] and is_disabled:
pass
elif item['disabled'] and not is_disabled:
# disable
print(f"DISABLE: {repo_name}")
new_path = fullpath + ".disabled"
os.rename(fullpath, new_path)
elif not item['disabled'] and is_disabled:
# enable
print(f"ENABLE: {repo_name}")
new_path = fullpath[:-9]
os.rename(fullpath, new_path)
fullpath = new_path
need_checkout = True
else:
need_checkout = True
if need_checkout:
repo = git.Repo(fullpath)
commit_hash = repo.head.commit.hexsha
if commit_hash != item['hash']:
print(f"CHECKOUT: {repo_name} [{item['hash']}]")
repo.git.checkout(item['hash'])
except Exception:
print(f"Failed to restore snapshots for the custom node '{path}'")
# clone missing
for k, v in git_custom_node_infos.items():
if not v['disabled']:
repo_name = k.split('/')[-1]
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
path = os.path.join(working_directory, repo_name)
if not os.path.exists(path):
print(f"CLONE: {path}")
gitclone(working_directory, k, v['hash'])
def invalidate_custom_node_file(file_custom_node_infos):
global nodelist_path
enabled_set = set()
for item in file_custom_node_infos:
if not item['disabled']:
enabled_set.add(item['filename'])
for path in os.listdir(working_directory):
fullpath = os.path.join(working_directory, path)
if not os.path.isdir(fullpath) and fullpath.endswith('.py'):
if path not in enabled_set:
print(f"DISABLE: {path}")
new_path = fullpath+'.disabled'
os.rename(fullpath, new_path)
elif not os.path.isdir(fullpath) and fullpath.endswith('.py.disabled'):
path = path[:-9]
if path in enabled_set:
print(f"ENABLE: {path}")
new_path = fullpath[:-9]
os.rename(fullpath, new_path)
# download missing: just support for 'copy' style
py_to_url = {}
with open(nodelist_path, 'r', encoding="UTF-8") as json_file:
info = json.load(json_file)
for item in info['custom_nodes']:
if item['install_type'] == 'copy':
for url in item['files']:
if url.endswith('.py'):
py = url.split('/')[-1]
py_to_url[py] = url
for item in file_custom_node_infos:
filename = item['filename']
if not item['disabled']:
target_path = os.path.join(working_directory, filename)
if not os.path.exists(target_path) and filename in py_to_url:
url = py_to_url[filename]
print(f"DOWNLOAD: {filename}")
download_url(url, working_directory)
def apply_snapshot(target):
try:
path = os.path.join(os.path.dirname(__file__), 'snapshots', f"{target}")
if os.path.exists(path):
if not target.endswith('.json') and not target.endswith('.yaml'):
print(f"Snapshot file not found: `{path}`")
print("APPLY SNAPSHOT: False")
return None
with open(path, 'r', encoding="UTF-8") as snapshot_file:
if target.endswith('.json'):
info = json.load(snapshot_file)
elif target.endswith('.yaml'):
info = yaml.load(snapshot_file, Loader=yaml.SafeLoader)
info = info['custom_nodes']
else:
# impossible case
print("APPLY SNAPSHOT: False")
return None
comfyui_hash = info['comfyui']
git_custom_node_infos = info['git_custom_nodes']
file_custom_node_infos = info['file_custom_nodes']
checkout_comfyui_hash(comfyui_hash)
checkout_custom_node_hash(git_custom_node_infos)
invalidate_custom_node_file(file_custom_node_infos)
print("APPLY SNAPSHOT: True")
if 'pips' in info:
return info['pips']
else:
return None
print(f"Snapshot file not found: `{path}`")
print("APPLY SNAPSHOT: False")
return None
except Exception as e:
print(e)
traceback.print_exc()
print("APPLY SNAPSHOT: False")
return None
def restore_pip_snapshot(pips, options):
non_url = []
local_url = []
non_local_url = []
for k, v in pips.items():
if v == "":
non_url.append(k)
else:
if v.startswith('file:'):
local_url.append(v)
else:
non_local_url.append(v)
failed = []
if '--pip-non-url' in options:
# try all at once
res = 1
try:
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install'] + non_url)
except:
pass
# fallback
if res != 0:
for x in non_url:
res = 1
try:
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x])
except:
pass
if res != 0:
failed.append(x)
if '--pip-non-local-url' in options:
for x in non_local_url:
res = 1
try:
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x])
except:
pass
if res != 0:
failed.append(x)
if '--pip-local-url' in options:
for x in local_url:
res = 1
try:
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x])
except:
pass
if res != 0:
failed.append(x)
print(f"Installation failed for pip packages: {failed}")
def setup_environment():
config = configparser.ConfigParser()
config.read(config_path)
if 'default' in config and 'git_exe' in config['default'] and config['default']['git_exe'] != '':
git.Git().update_environment(GIT_PYTHON_GIT_EXECUTABLE=config['default']['git_exe'])
setup_environment()
try:
if sys.argv[1] == "--clone":
gitclone(sys.argv[2], sys.argv[3])
elif sys.argv[1] == "--check":
gitcheck(sys.argv[2], False)
elif sys.argv[1] == "--fetch":
gitcheck(sys.argv[2], True)
elif sys.argv[1] == "--pull":
gitpull(sys.argv[2])
elif sys.argv[1] == "--apply-snapshot":
options = set()
for x in sys.argv:
if x in ['--pip-non-url', '--pip-local-url', '--pip-non-local-url']:
options.add(x)
pips = apply_snapshot(sys.argv[2])
if pips and len(options) > 0:
restore_pip_snapshot(pips, options)
sys.exit(0)
except Exception as e:
print(e)
sys.exit(-1)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。