代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
"""
Copyright (c) [2021] Huawei Technologies Co.,Ltd.ALL rights reserved.
This program is licensed under Mulan PSL v2.
You can use it according to the terms and conditions of the Mulan PSL v2.
http://license.coscl.org.cn/MulanPSL2
THIS PROGRAM IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
@Author : KotorinMinami
@email : huangshuo4@gmail.com
@Date : 2024-2-28 00:42:43
@License : Mulan PSL v2
@Version : 1.0
@Desc : 软件包的安装卸载
"""
import os
import sys
import subprocess
import tempfile
import argparse
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(SCRIPT_PATH)
import mugen_log
import ssh_cmd
def local_cmd(cmd, conn=None):
"""本地命令执行
Args:
cmd ([str]): 需要执行的命令
conn ([class], optional): 建立和远端的连接. Defaults to None.
Returns:
[list]: 命令执行后的返回码,命令执行结果
"""
exitcode, output = subprocess.getstatusoutput(cmd)
return exitcode, output
def rpm_install(pkgs, node=1, tmpfile=""):
"""安装软件包
Args:
pkgs ([str]): 软包包名,"bc" or "bc vim"
node (int, optional): 节点号. Defaults to 1.
tmpfile (str, optional): 软件包及其依赖包的缓存文件. Defaults to "".
Returns:
[list]: 错误码,安装的包的列表
"""
if pkgs == "":
mugen_log.logging("error", "the following arguments are required:pkgs")
sys.exit(1)
localtion = os.environ.get("NODE" + str(node) + "_LOCALTION")
if localtion == "local":
conn = None
func = local_cmd
else:
conn = ssh_cmd.pssh_conn(
os.environ.get("NODE" + str(node) + "_IPV4"),
os.environ.get("NODE" + str(node) + "_PASSWORD"),
os.environ.get("NODE" + str(node) + "_SSH_PORT"),
os.environ.get("NODE" + str(node) + "_USER"),
)
func = ssh_cmd.pssh_cmd
result = func(conn=conn, cmd="zypper install -y -d --no-recommends " + pkgs)[1]
if "is already installed" in result and "Nothing to do." in result:
mugen_log.logging("info", "pkgs:(%s) is already installed" % pkgs)
return 0, None
repoCode, repoList = func(
conn=conn,
cmd=" zypper lr | grep -E [0-9] | awk -F \"|\" '{print$3}' | sed 's/^[ \\t]*//'",
)
if repoCode != 0:
return repoCode, repoList
depCode , depList = func(conn=conn, cmd="zypper install -y -d --no-recommends "
+ pkgs
+ ' | grep -A 1000 "going to be installed:" | grep -E "^ "'
)
if depCode != 0:
return depCode, depList
exitcode, result = func(conn=conn, cmd="zypper install -y --no-recommends " + pkgs)
if tmpfile == "":
tmpfile = tempfile.mkstemp(dir="/tmp")[1]
with open(tmpfile, "a+") as f:
for depPkg in depList.split("\n"):
f.write(depPkg + " ")
if exitcode == 0:
result = f.name
return exitcode, result
def rpm_remove(node=1, pkgs="", tmpfile=""):
"""卸载软件包
Args:
node (int, optional): 节点号. Defaults to 1.
pkgs (str, optional): 需要卸载的软件包. Defaults to "".
tmpfile (str, optional): 安装时所有涉及的包. Defaults to "".
Returns:
list: 错误码,卸载列表或错误信息
"""
if pkgs == "" and tmpfile == "":
mugen_log.logging(
"error", "Packages or package files these need to be removed must be added"
)
sys.exit(1)
localtion = os.environ.get("NODE" + str(node) + "_LOCALTION")
if localtion == "local":
conn = None
func = local_cmd
else:
conn = ssh_cmd.pssh_conn(
os.environ.get("NODE" + str(node) + "_IPV4"),
os.environ.get("NODE" + str(node) + "_PASSWORD"),
os.environ.get("NODE" + str(node) + "_SSH_PORT"),
os.environ.get("NODE" + str(node) + "_USER"),
)
func = ssh_cmd.pssh_cmd
depList = ""
if tmpfile != "":
with open(tmpfile, "r") as f:
depList = f.read()
exitcode = func(conn=conn, cmd="zypper remove -y -U " + pkgs + " " + depList)[0]
if localtion != "local":
ssh_cmd.pssh_close(conn)
return exitcode
if __name__ == "__main__":
parser = argparse.ArgumentParser(
usage="rpm_manage.py install|remove [-h] [--node NODE] [--pkgs PKG] [--tempfile TEPMFILE]",
description="manual to this script",
)
parser.add_argument(
"operation", type=str, choices=["install", "remove"], default=None
)
parser.add_argument("--node", type=int, default=1)
parser.add_argument("--pkgs", type=str, default="")
parser.add_argument("--tempfile", type=str, default="")
args = parser.parse_args()
if sys.argv[1] == "install":
exitcode, output = rpm_install(args.pkgs, args.node, args.tempfile)
if output is not None:
print(output)
sys.exit(exitcode)
elif sys.argv[1] == "remove":
exitcode = rpm_remove(args.node, args.pkgs, args.tempfile)
sys.exit(exitcode)
else:
mugen_log.logging(
"error",
"usage: rpm_manage.py install|remove [-h] [--node NODE] [--pkg PKG] [--tempfile TEPMFILE]",
)
sys.exit(1)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。