1 Star 0 Fork 0

sysdl132/snapd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
spread-shellcheck 9.30 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python3
# Copyright (C) 2018 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import subprocess
import argparse
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from typing import Dict
import yaml
# default shell for shellcheck
SHELLCHECK_SHELL = os.getenv('SHELLCHECK_SHELL', 'bash')
# set to non-empty to ignore all errors
NO_FAIL = os.getenv('NO_FAIL')
# set to non empty to enable 'set -x'
D = os.getenv('D')
# set to non-empty to enable verbose logging
V = os.getenv('V')
# set to a number to use these many threads
N = int(os.getenv('N') or cpu_count())
# file with list of files that can fail validation
CAN_FAIL = os.getenv('CAN_FAIL')
# names of sections
SECTIONS = ['prepare', 'prepare-each', 'restore', 'restore-each',
'debug', 'debug-each', 'execute', 'repack']
def parse_arguments():
parser = argparse.ArgumentParser(description='spread shellcheck helper')
parser.add_argument('-s', '--shell', default='bash',
help='shell')
parser.add_argument('-n', '--no-errors', action='store_true',
default=False, help='ignore all errors ')
parser.add_argument('-v', '--verbose', action='store_true',
default=False, help='verbose logging')
parser.add_argument('--can-fail', default=None,
help=('file with list of files that are can fail '
'validation'))
parser.add_argument('-P', '--max-procs', default=N, type=int, metavar='N',
help='run these many shellchecks in parallel (default: %(default)s)')
parser.add_argument('paths', nargs='+', help='paths to check')
return parser.parse_args()
class ShellcheckRunError(Exception):
def __init__(self, stderr):
super().__init__()
self.stderr = stderr
class ShellcheckError(Exception):
def __init__(self, path):
super().__init__()
self.sectionerrors = {}
self.path = path
def addfailure(self, section, error):
self.sectionerrors[section] = error
def __len__(self):
return len(self.sectionerrors)
class ShellcheckFailures(Exception):
def __init__(self, failures=None):
super().__init__()
self.failures = set()
if failures:
self.failures = set(failures)
def merge(self, otherfailures):
self.failures = self.failures.union(otherfailures.failures)
def __len__(self):
return len(self.failures)
def intersection(self, other):
return self.failures.intersection(other)
def difference(self, other):
return self.failures.difference(other)
def __iter__(self):
return iter(self.failures)
def checksection(data, env: Dict[str, str]):
# spread shell snippets are executed under 'set -e' shell, make sure
# shellcheck knows about that
script_data = []
script_data.append('set -e')
for key, value in env.items():
value = str(value)
# Unpack the special "$(HOST: ...) syntax and tell shellcheck not to
# worry about the use of echo to print variable value.
if value.startswith("$(HOST:") and value.endswith(")"):
script_data.append("# shellcheck disable=SC2116")
value = "$({})".format(value[len("$(HOST:"):-1])
# XXX: poor man's shell key=value assignment with values in double
# quotes so that one value can refer to another value.
if '"' in value:
value = value.replace('"', '\"')
# converts
# FOO: "$(HOST: echo $foo)" -> FOO="$(echo $foo)"
# FOO: "$(HOST: echo \"$foo\")" -> FOO="$(echo \"$foo\")"
# FOO: "foo" -> FOO="foo"
script_data.append("{}=\"{}\"".format(key, value))
script_data.append("export {}".format(key))
script_data.append(data)
proc = subprocess.Popen("shellcheck -s {} -x -".format(SHELLCHECK_SHELL),
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
shell=True)
stdout, _ = proc.communicate(input='\n'.join(script_data).encode('utf-8'), timeout=10)
if proc.returncode != 0:
raise ShellcheckRunError(stdout)
def checkfile(path):
logging.debug("checking file %s", path)
with open(path) as inf:
data = yaml.load(inf, Loader=yaml.CSafeLoader)
errors = ShellcheckError(path)
# TODO: handle stacking of environment from other places that influence it:
# spread.yaml -> global env + backend env + suite env -> task.yaml (task
# env + variant env).
env = {}
for key, value in data.get("environment", {}).items():
if "/" in key:
# TODO: re-check with each variant's value set.
key = key.split('/', 1)[0]
env[key] = value
for section in SECTIONS:
if section not in data:
continue
try:
logging.debug("%s: checking section %s", path, section)
checksection(data[section], env)
except ShellcheckRunError as serr:
errors.addfailure(section, serr.stderr.decode('utf-8'))
if path.endswith('spread.yaml') and 'suites' in data:
# check suites
for suite in data['suites'].keys():
for section in SECTIONS:
if section not in data['suites'][suite]:
continue
try:
logging.debug("%s (suite %s): checking section %s", path, suite, section)
checksection(data['suites'][suite][section], env)
except ShellcheckRunError as serr:
errors.addfailure('suites/' + suite + '/' + section,
serr.stderr.decode('utf-8'))
if errors:
raise errors
def findfiles(indir):
for root, _, files in os.walk(indir, topdown=True):
for name in files:
if name in ['spread.yaml', 'task.yaml']:
yield os.path.join(root, name)
def checkpath(loc, max_workers):
if os.path.isdir(loc):
# setup iterator
locations = findfiles(loc)
else:
locations = [loc]
failed = []
def check1path(path):
try:
checkfile(path)
except ShellcheckError as err:
return err
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for serr in executor.map(check1path, locations):
if serr is None:
continue
logging.error(('shellcheck failed for file %s in sections: '
'%s; error log follows'),
serr.path, ', '.join(serr.sectionerrors.keys()))
for section, error in serr.sectionerrors.items():
logging.error("%s: section '%s':\n%s", serr.path, section, error)
failed.append(serr.path)
if failed:
raise ShellcheckFailures(failures=failed)
def loadfilelist(flistpath):
flist = set()
with open(flistpath) as inf:
for line in inf:
if not line.startswith('#'):
flist.add(line.strip())
return flist
def main(opts):
paths = opts.paths or ['.']
failures = ShellcheckFailures()
for pth in paths:
try:
checkpath(pth, opts.max_procs)
except ShellcheckFailures as sf:
failures.merge(sf)
if failures:
if opts.can_fail:
can_fail = loadfilelist(opts.can_fail)
unexpected = failures.difference(can_fail)
if unexpected:
logging.error(('validation failed for the following '
'non-whitelisted files:\n%s'),
'\n'.join([' - ' + f for f in
sorted(unexpected)]))
raise SystemExit(1)
did_not_fail = can_fail - failures.intersection(can_fail)
if did_not_fail:
logging.error(('the following files are whitelisted '
'but validated successfully:\n%s'),
'\n'.join([' - ' + f for f in
sorted(did_not_fail)]))
raise SystemExit(1)
# no unexpected failures
return
logging.error('validation failed for the following files:\n%s',
'\n'.join([' - ' + f for f in sorted(failures)]))
if NO_FAIL or opts.no_errors:
logging.warning("ignoring errors")
else:
raise SystemExit(1)
if __name__ == '__main__':
opts = parse_arguments()
if opts.verbose or D or V:
lvl = logging.DEBUG
else:
lvl = logging.INFO
logging.basicConfig(level=lvl)
if CAN_FAIL:
opts.can_fail = CAN_FAIL
if NO_FAIL:
opts.no_errors = True
main(opts)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sysdl132/snapd.git
git@gitee.com:sysdl132/snapd.git
sysdl132
snapd
snapd
latest

搜索帮助

0d507c66 1850385 C8b1a773 1850385