1 Star 0 Fork 4

anolis/cve-diagnose

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
get_fixcve_allrpm.py 13.91 KB
Copy Edit Raw Blame History
# -*- coding:utf-8 -*-
import requests
import time
import logging
import os
from pprint import pprint
from autodiagconfig import hostname, fix_dict, trigger_cve_pkgver_dict
def loggerr():
log_path = os.path.join(os.getcwd(), 'log_path')
if not os.path.exists(log_path):
os.mkdir(log_path)
logname = os.path.join(log_path, '{}autocrediagsc.log'.format(time.strftime('%Y%m%d%H%M%S')))
if os.path.isfile(logname):
os.remove(logname)
time.sleep(0.01)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(logname, encoding='utf-8')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(asctime)s-%(name)s : %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class AllRpmVersion:
"""
获取https://mirrors.openanolis.cn/anolis/下所有OS版本下去除架构名称后的所有rpm包
"""
def __init__(self, loger):
self.loger = loger
self.all_ver_list = self.get_dirname(self.get_request(hostname).text)
self.loger.info("all_ver_list>>>{}".format(self.all_ver_list))
@staticmethod
def get_request(urlname):
"""
作用:用获取url页面的内容
参数:url
返回值:response对象
"""
resp = requests.get('{}'.format(urlname))
return resp
@staticmethod
def get_dirname(pagename):
"""
作用:获取pagename页面的内容
参数:pagename--->url
返回值:列表,包含当前页面的内容
"""
dirname_list = []
all_dirname_list = pagename.strip().split('\n')
for i in all_dirname_list:
if not i.startswith('<a'):
continue
else:
name = i.strip().split('>')[1].strip().split('<')[0]
if name == 'isos/' or name == 'updateinfo/':
continue
else:
dirname_list.append(name)
return dirname_list
def get_all_pkg(self):
"""
获取https://mirrors.openanolis.cn/anolis/下所有OS版本下去除架构名称后的所有rpm包-->all_ver_pkg_list
"""
all_ver_pkg_list = []
for ver in self.all_ver_list:
# if ver == '8/' or ver == "7.7/" or ver == "7.9/" or ver == "8.4/"
# or ver == "8.5/" or ver == 'RPM-GPG-KEY-ANOLIS': # 8.2
# if ver == '8/' or ver == 'RPM-GPG-KEY-ANOLIS': # 8.2 8.4 7.7 7.9 8.5
if ver == 'RPM-GPG-KEY-ANOLIS': # 8.2 8.4 7.7 7.9 8.5 8
continue
else:
pathnamever = hostname + ver
self.loger.info("pathnamever>>>{}".format(pathnamever))
pagename = self.get_request(pathnamever).text
rpm_repo_name_list = self.get_dirname(pagename)
all_pkg_unarch_osver_list = []
self.handle_info(rpm_repo_name_list, pathnamever, all_pkg_unarch_osver_list,
all_ver_pkg_list)
return all_ver_pkg_list
def handle_info(self, rpm_repo_name_list, pathnamever, all_pkg_unarch_osver_list, all_ver_pkg_list):
"""
得到某个OS下的repo路径--->pathnamerepo
例如:https://mirrors.openanolis.cn/anolis/7.7/Plus/
得到某个repo下的架构列表--->second_page_list
例如:['aarch64/', 'source/', 'x86_64/']
"""
for reponame in rpm_repo_name_list:
repo_allpkg_unarch_list = []
pathnamerepo = pathnamever + reponame
self.loger.info('pathnamerepo>>>{}'.format(pathnamerepo))
resp1 = self.get_request(pathnamerepo)
arch_page_list = self.get_dirname(resp1.text)
if not arch_page_list:
continue
self.d2_page(arch_page_list, pathnamever, reponame, repo_allpkg_unarch_list)
self.loger.info("repo_allpkg_list>>>{}".format(repo_allpkg_unarch_list))
self.loger.info("{}".format(len(repo_allpkg_unarch_list)))
self.loger.info("========================")
self.loger.info("")
all_pkg_unarch_osver_list.extend(repo_allpkg_unarch_list)
all_ver_pkg_list.extend(all_pkg_unarch_osver_list)
def d2_page(self, second_page_list, pathnamever, reponame, repo_allpkg_unarch_list):
"""
得到OS下某个repo下rpm包所在目录----pathnamepkg
例如:https://mirrors.openanolis.cn/anolis/7.7/Plus/aarch64/os/Packages
"""
for d2 in second_page_list:
if d2 == "source/":
pathnamepkg = pathnamever + reponame + d2 + "Packages"
elif d2 == "loongarch64/":
continue
elif d2 == "aarch64/" or d2 == "x86_64/" or d2 == "riscv64/":
pathnamepkg = pathnamever + reponame + d2 + "os/Packages"
else:
self.loger.warning("abnormal codition,this path is ignored:{}".format(d2))
self.loger.warning("{}".format(pathnamever + reponame + d2))
continue
self.loger.info('pathnamepkg>>>{}'.format(pathnamepkg))
self.all_href_rpm_end(pathnamepkg, repo_allpkg_unarch_list)
def all_href_rpm_end(self, pathnamepkg, repo_allpkg_unarch_list):
"""
从pathnamepkg获取到rpm页面信息的列表:rpm_page_list
从rpm_page_list中拿到rpm包-->pkgname
"""
resp3 = self.get_request(pathnamepkg)
rpm_page_list = resp3.text.strip().split('\n')
for i in rpm_page_list:
if i.startswith('<a'):
pkgname = i.strip().split('>')[0].strip().split("=")[1].strip("\"")
self.del_arch_end(pkgname, repo_allpkg_unarch_list)
@staticmethod
def del_arch_end(pkgname, repo_allpkg_unarch_list):
"""
rpm包去除架构后缀后,将其放到列表中
"""
if pkgname.endswith(".src.rpm"):
repo_allpkg_unarch_list.append(pkgname.split(".src.rpm")[0])
elif pkgname.endswith(".x86_64.rpm"):
repo_allpkg_unarch_list.append(pkgname.split(".x86_64.rpm")[0])
elif pkgname.endswith(".aarch64.rpm"):
repo_allpkg_unarch_list.append(pkgname.split(".aarch64.rpm")[0])
elif pkgname.endswith(".noarch.rpm"):
repo_allpkg_unarch_list.append(pkgname.split(".noarch.rpm")[0])
class VulnerableVersions:
"""
获取易受攻击rpm包版本
"""
def __init__(self, loger):
self.loger = loger
self.gt_fixver_list = []
self.lt_triggerver_list = []
self.gtfix = "gtfix"
self.lttrigger = "lttrigger"
def allpkgdelarch(self):
"""
调用AllRpmVersion类,得到https://mirrors.openanolis.cn/anolis下所有OS的所有repo下的rpm包
"""
protal_client = AllRpmVersion(self.loger)
allpkg_delarch_list = protal_client.get_all_pkg()
return allpkg_delarch_list
def get_fixpkg_all_ver(self):
"""
从获取的所有rpm包结果中筛选cve修复包含的rpm包的所有版本-->fixrpm_allver_list
"""
fixrpm_allver_list = []
allpkg_delarch_list = self.allpkgdelarch()
repeat_allpkg_delarch_list = list(set(allpkg_delarch_list))
self.loger.info("repeat_allpkg_delarch_list, lenth>>>{}".format(len(repeat_allpkg_delarch_list)))
for pkg in repeat_allpkg_delarch_list:
for rpmname in fix_dict:
if pkg.rsplit('-', 2)[0] == rpmname.rsplit('-', 2)[0]:
fixrpm_allver_list.append(pkg)
self.loger.info("fixrpm_allver_list >>>{}".format(fixrpm_allver_list))
self.loger.info("fixrpm_allver_list,lenth >>>{}".format(len(fixrpm_allver_list)))
return fixrpm_allver_list
def filter_fixver(self):
"""
从修复rpm包的所有版本中过滤掉大于等于修复版本的版本、小于引入cve版本的版本
"""
fixrpm_allver_list = self.get_fixpkg_all_ver()
self.loger.info("all cve rpm>>>{}".format(fixrpm_allver_list))
self.loger.info("all cve rpm lenth>>>{}".format(len(fixrpm_allver_list)))
if not fixrpm_allver_list: # 没有在https://mirrors.openanolis.cn/anolis找到给修复包名称,需要检查给定是否正确
raise Exception(
"error:Not in https://mirrors.openanolis.cn/anolis/ pkg found in,please check-->fix_dict.keys()")
# 删除修复版本
for pname, fixverlist in fix_dict.items():
for fixver in list(set(fixverlist)):
fixrpm_allver_list.remove(fixver)
# 删除大于fixver的版本或小于triggerver的版本
self.del_mirrors_gtfixver_lttriggerver_pkg(fixrpm_allver_list)
self.loger.info("gt_fixver_list>>>{}".format(self.gt_fixver_list))
self.loger.info("gt_fixver_list lenth>>>{}".format(len(self.gt_fixver_list)))
self.loger.info("lt_triggerver_list>>>{}".format(self.lt_triggerver_list))
self.loger.info("lt_triggerver_list lenth>>>{}".format(len(self.lt_triggerver_list)))
vuknlist = fixrpm_allver_list
self.loger.info("vuknlist >>>{}".format(vuknlist))
self.loger.info("vuknlist,lenth >>>{}".format(len(vuknlist)))
return vuknlist
def del_mirrors_gtfixver_lttriggerver_pkg(self, fixrpm_allver_list):
"""
筛选从mirrors中获取的cve对应的rpm包的所有版本中大于等于修复版本或者小于cve引入版本的版本
"""
for item in fixrpm_allver_list:
self.get_mirrors_gtfixver_lttriggerver(item)
self.del_gtlt(fixrpm_allver_list, self.gt_fixver_list)
self.del_gtlt(fixrpm_allver_list, self.lt_triggerver_list)
@staticmethod
def del_gtlt(fixrpm_allver_list, gtltlist):
"""
删除大于fixver的版本或小于triggerver的版本
"""
for a in gtltlist:
if a in fixrpm_allver_list:
fixrpm_allver_list.remove(a)
def get_mirrors_gtfixver_lttriggerver(self, item):
"""
处理从mirrors中的到的包,得到mirrorver mirroros
如:polkit-0.112-26.an7.1 处理后得到:polkit-0.112-26.和7
"""
y = item.replace('el', 'an')
mirrorsver_split_list = y.rsplit('an', 1) # 例如:y,polkit-0.112-26.an7.1
mirrorver = mirrorsver_split_list[0] # 如:polkit-0.112-26.
miros = mirrorsver_split_list[1] # 如:7.1
ospoint_count = mirrorsver_split_list[1].count('.')
if ospoint_count == 0:
mirroros = miros
else:
gosend = miros.split('.', 1)
mirroros = gosend[0]
self.parms_dict(item, fix_dict, mirrorver, mirroros, self.gtfix, self.gt_fixver_list)
if not trigger_cve_pkgver_dict:
pass
else:
self.parms_dict(item, trigger_cve_pkgver_dict, mirrorver, mirroros, self.lttrigger, self.lt_triggerver_list)
def parms_dict(self, item, pardict, mirrorver, mirroros, vertype, gtltlist):
"""
对fix_dict和trigger_cve_pkgver_dict中的包名进行处理,并和从mirrors中得到的包进行对比,
以获取大于fixver的版本或小于triggerver的版本的列表
"""
for k, v in pardict.items():
for parmpkg in v:
parveros = self.handle_parms_fixver_triggerver_list_pkg(parmpkg)
if parveros[1] == mirroros: # 包中的系统信息相同,如:都是7
self.loger.info("parveros[1]>>{} mirroros>>{}".format(parveros[1], mirroros))
parpkgname = k.rsplit('-', 2)[0]
mirrorspkgname = item.rsplit('-', 2)[0]
if vertype == "gtfix":
# 系统信息相同、包名相同、版本大于等于修复版本
if parpkgname == mirrorspkgname and mirrorver >= parveros[0]:
self.inf(vertype, parpkgname, mirrorspkgname, mirrorver, parveros[0])
gtltlist.append(item)
if vertype == "lttrigger":
# 系统信息相同、包名相同、版本小于cve引入版本
if parpkgname == mirrorspkgname and mirrorver < parveros[0]:
# if parpkgname == mirrorspkgname and mirrorver <= parveros[0]: # 测试用的
self.inf(vertype, parpkgname, mirrorspkgname, mirrorver, parveros[0])
gtltlist.append(item)
def inf(self, vertype, parpkgname, mirrorspkgname, mirrorver, parveros0):
self.loger.info("vertype>>{}".format(vertype))
self.loger.info("parpkgname>>{} mirrorspkgname>>{}".format(parpkgname, mirrorspkgname))
self.loger.info("mirrorver>>{} parveros[0]>>{}".format(mirrorver, parveros0))
@staticmethod
def handle_parms_fixver_triggerver_list_pkg(pkg):
"""
处理fix_dict或者trigger_cve_pkgver_dict中的包信息,得到parver, argos
用来和从mirrors中获取的包作比较
例如:polkit-0.115-13.8_5.1
最终处理后得到两部分内容:polkit-0.115-13. 和 8_5
"""
trifixpkg_split_list = pkg.rsplit('an', 1)
point_count = trifixpkg_split_list[1].count('.')
parver = trifixpkg_split_list[0]
paros = trifixpkg_split_list[1]
if point_count == 0:
argos = paros
else:
osend = paros.split('.', 1)
argos = osend[0] # 8_5
return parver, argos
def modify_str(self):
"""
对筛选后的包列表进行处理,构造成一个字符串,结果将放到shell的数组中
"""
pkg_str = ""
vunver_result_list = self.filter_fixver()
i = 0
for pkg in vunver_result_list:
if i == len(vunver_result_list) - 1:
pkg_str = pkg_str + "\'" + pkg + "\'" + "\n"
else:
pkg_str = pkg_str + "\'" + pkg + "\'" + "\n" + " "
i += 1
return pkg_str
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Shell
1
https://gitee.com/anolis/cve-diagnose.git
git@gitee.com:anolis/cve-diagnose.git
anolis
cve-diagnose
cve-diagnose
master

Search