代码拉取完成,页面将自动刷新
#!python3
"""
Case Runner
"""
from curses import is_term_resized
import functools
import hashlib
import json
import logging
import queue
import shutil
import threading
from io import RawIOBase, TextIOWrapper
from logging import handlers
import subprocess
import sys
import time
import random
from subprocess import CompletedProcess
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
from typing import Dict, List, Tuple
import os
import getopt
from multiprocessing import cpu_count
import linecache
import datetime
import re
import pika
from enum import Enum
from textual.app import App, ComposeResult
from textual.containers import ScrollableContainer
from textual.reactive import reactive
from textual.widgets import Button, Footer, Header, Static, Label
import yaml
DEBUG = False
VERSION = "0.0.3"
LANGUAGE = "zh"
CCDLThreadPool = ThreadPoolExecutor(4)
FUZZ_UNKNOW = 0
FUZZ_WAITTING = 1
FUZZ_RUNNING = 2
FUZZ_FINISH = 3
FUZZ_EXITTING = 5
def get_fuzz_status_name(fuzz_status_id: int) -> str:
if fuzz_status_id == FUZZ_WAITTING:
return "Waitting"
elif fuzz_status_id == FUZZ_RUNNING:
return "Running"
elif fuzz_status_id == FUZZ_FINISH:
return "Finish"
elif fuzz_status_id == FUZZ_EXITTING:
return "Exitting"
else:
return "Unknow"
class Printer:
def print_red(self, msg: str):
print(f"\033[0;31m{msg}\033[0m")
def print_green(self, msg: str):
print(f"\033[0;32m{msg}\033[0m")
def print_yellow(self, msg: str):
print(f"\033[0;33m{msg}\033[0m")
def print_blue(self, msg: str):
print(f"\033[0;34m{msg}\033[0m")
def print_magenta(self, msg: str):
print(f"\033[0;35m{msg}\033[0m")
def print_cyan(self, msg: str):
print(f"\033[0;36m{msg}\033[0m")
def to_red(self, msg: str) -> str:
return f"\033[0;31m{msg}\033[0m"
def to_green(self, msg: str) -> str:
return f"\033[0;32m{msg}\033[0m"
def to_yellow(self, msg: str) -> str:
return f"\033[0;33m{msg}\033[0m"
def to_blue(self, msg: str) -> str:
return f"\033[0;34m{msg}\033[0m"
def to_magenta(self, msg: str) -> str:
return f"\033[0;35m{msg}\033[0m"
def to_cyan(self, msg: str) -> str:
return f"\033[0;36m{msg}\033[0m"
# used for logging out the messages
# if DEBUG is True the Logger.debug will be omitted
class Logger(object):
level_relations = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"crit": logging.CRITICAL,
}
def __init__(
self,
filename,
level="debug",
when="D",
backCount=30,
fmt="%(asctime)s - %(levelname)s: %(message)s",
enable_stdout=False
):
# '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt)
self.logger.setLevel(self.level_relations.get(level))
sh = logging.StreamHandler()
sh.setFormatter(format_str)
th = handlers.TimedRotatingFileHandler(
filename=filename, when=when, backupCount=backCount, encoding="utf-8"
)
th.setFormatter(format_str)
# 让logger只输出到文件,不输出到stdout
if enable_stdout:
self.logger.addHandler(sh)
self.logger.addHandler(th)
def debug(self, msg: str, language="zh"):
if DEBUG and language in ["all", LANGUAGE]:
self.logger.debug(msg)
def info(self, msg: str, language="zh"):
if language in ["all", LANGUAGE]:
self.logger.info(msg)
def warning(self, msg: str, language="zh"):
if language in ["all", LANGUAGE]:
self.logger.warning(msg)
def error(self, msg: str, language="zh", stack_info=False):
if language in ["all", LANGUAGE]:
self.logger.error(msg, stack_info=stack_info, exc_info=False)
def critical(self, msg: str, language="zh"):
if language in ["all", LANGUAGE]:
self.logger.critical(
msg, stack_info=True, exec_info=True
)
log = Logger("case_runner.log", level="debug", enable_stdout=True)
class TaskLogger(Logger):
def __init__(self, filename: str, task_id: int):
fmt = f"%(asctime)s - %(levelname)s: [TASK {task_id}] %(message)s"
super().__init__(filename=filename, fmt=fmt)
class MsgQueue:
def __init__(self,
uuid: str,
on_message_callback=None,
user='guest',
password='guest') -> None:
#用户名和密码
self.user_info = pika.PlainCredentials(user, password)
self.on_message_callback = on_message_callback
self.uuid = str(uuid)
self.establish_connection()
def establish_connection(self):
#连接服务器上的RabbitMQ服务
parameters = pika.ConnectionParameters('127.0.0.1', 5672, '/', self.user_info, heartbeat=0)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
self.channel.queue_declare(self.uuid)
if self.on_message_callback is not None:
self.channel.basic_consume(
queue=self.uuid,
auto_ack=True,
on_message_callback=self.on_message_callback
)
def publish(self, msg: dict):
try: self.channel.basic_publish(
exchange="",
routing_key=self.uuid,
body=json.dumps(msg)
)
except Exception as e:
# 发送失败,尝试重新连接
self.establish_connection()
self.channel.basic_publish(
exchange="",
routing_key=self.uuid,
body=json.dumps(msg)
)
def start_consuming(self):
self.channel.start_consuming()
class Util:
"""
Util 类
Util 类中存放了一些模块化的函数,方便使用
"""
"""
mkdir 方法
使用os模块实现新建目录操作
path : 新建目录路径
mode : 规定了当path对应的目录或文件已存在时的策略
no_overwrite (默认): 不会覆盖
overwrite : 会覆盖
ask_before_overwrite : 覆盖之前会询问用户
"""
@staticmethod
def mkdir(path: str, mode: str = "no_overwrite"):
if os.path.exists(path):
# 先判断一下该路径是不是什么怪东西
if not os.path.isdir(path):
raise NotADirectoryError(
f"path : {path} is not a directory, please check!"
)
is_delete = False
if mode == "ask_before_overwrite":
# 如果输出目录已经存在,则让用户决定是否覆盖
while True:
hint_msg = ""
if LANGUAGE == "en":
hint_msg = f"directory {path} is already exist, are you sure to overwrite it? [Y/n]: "
elif LANGUAGE == "zh":
hint_msg = f"目录 {path} 已经存在,是否想要覆盖它 [Y/n]: "
user_input = input(hint_msg)
if user_input.lower() == "y":
is_delete = True
break
elif user_input.lower() == "n":
is_delete = False
break
else:
continue
elif mode == "overwrite":
is_delete = True
elif mode == "no_overwrite":
is_delete = False
if is_delete:
if os.path.isdir(path):
# os.removedirs(path)
shutil.rmtree(path)
elif os.path.isfile(path):
os.remove(path)
else:
raise Exception(f"{path} 不是一个有效的目录,请检查!")
else:
if mode == "ask_before_overwrite":
raise Exception(
"用户选择不覆盖已存在的目录,程序结束"
)
else:
return
# 到此为止,能保证目录一定是不存在的, 便可以新建目录
os.makedirs(path)
@staticmethod
def check_probability(prob):
choices = [True, False]
weights = [prob, 1 - prob]
return random.choices(choices, weights=weights)[0]
@staticmethod
def copy_with_meta_data(src_path, dst_path):
shutil.copy2(src_path, dst_path)
@staticmethod
def cal_timestamp_diff_to_str(past: float, now: float) -> str:
if past is None or now is None:
return "None"
# 将时间戳转换为datetime对象
dt1 = datetime.datetime.fromtimestamp(past)
dt2 = datetime.datetime.fromtimestamp(now)
# 计算时间差
time_diff = dt2 - dt1
# 将timedelta对象转换为总秒数
total_seconds = time_diff.total_seconds()
# 计算小时、分钟和秒
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
return f"{hours}h {minutes}m {seconds}s"
@staticmethod
def cal_timestamp_diff_to_seconds(past: float, now: float) -> float:
if past is None or now is None:
return "None"
# 将时间戳转换为datetime对象
dt1 = datetime.datetime.fromtimestamp(past)
dt2 = datetime.datetime.fromtimestamp(now)
# 计算时间差
time_diff = dt2 - dt1
# 将timedelta对象转换为总秒数
total_seconds = time_diff.total_seconds()
return total_seconds
@staticmethod
def cal_timestamp_diff_to_seconds_from_now(past: float) -> float:
return Util.cal_timestamp_diff_to_seconds(past, time.time())
@staticmethod
def cal_timestamp_diff_to_str_from_now(past: float) -> str:
if past is None:
return "None"
return Util.cal_timestamp_diff_to_str(past, time.time())
class CVEChecker:
"""
检测crash的ASAN报告,比对是否符合某个cve的特征
"""
def __init__(self, cve_asan_report="", tlog: TaskLogger = None):
self.cve_func_map = {
"swftophp-4.7-2016-9827": self.check_swftophp_2016_9827,
"swftophp-4.7-2016-9829": self.check_swftophp_2016_9829,
"swftophp-4.7-2016-9831": self.check_swftophp_2016_9831,
"swftophp-4.7-2017-9988": self.check_swftophp_2017_9988,
"swftophp-4.7-2017-11728": self.check_swftophp_2017_11728,
"swftophp-4.7-2017-11729": self.check_swftophp_2017_11729,
"swftophp-4.7.1-2017-7578": self.check_swftophp_2017_7578,
"swftophp-4.8-2018-7868": self.check_swftophp_2018_7868,
"swftophp-4.8-2018-8807": self.check_swftophp_2018_8807,
"swftophp-4.8-2018-8962": self.check_swftophp_2018_8962,
"swftophp-4.8-2018-11095": self.check_swftophp_2018_11095,
"swftophp-4.8-2018-11225": self.check_swftophp_2018_11225,
"swftophp-4.8-2018-11226": self.check_swftophp_2018_11226,
"swftophp-4.8-2018-20427": self.check_swftophp_2018_20427,
"swftophp-4.8.1-2019-9114": self.check_swftophp_2019_9114,
"swftophp-4.8-2019-12982": self.check_swftophp_2019_12982,
"swftophp-4.8-2020-6628": self.check_swftophp_2020_6628,
"lrzip-9de7ccb-2017-8846": self.check_lrzip_2017_8846,
"lrzip-ed51e14-2018-11496": self.check_lrzip_2018_11496,
"cxxfilt-2016-4487": self.check_cxxfilt_2016_4487,
"cxxfilt-2016-4489": self.check_cxxfilt_2016_4489,
"cxxfilt-2016-4490": self.check_cxxfilt_2016_4490,
"cxxfilt-2016-4491": self.check_cxxfilt_2016_4491,
"cxxfilt-2016-4492": self.check_cxxfilt_2016_4492,
"cxxfilt-2016-6131": self.check_cxxfilt_2016_6131,
"objcopy-2017-8393": self.check_objcopy_2017_8393,
"objcopy-2017-8394": self.check_objcopy_2017_8394,
"objcopy-2017-8395": self.check_objcopy_2017_8395,
"objdump-2017-8392": self.check_objdump_2017_8392,
"objdump-2017-8396": self.check_objdump_2017_8396,
"objdump-2017-8397": self.check_objdump_2017_8397,
"objdump-2017-8398": self.check_objdump_2017_8398,
"objdump-2.31.1-2018-17360": self.check_objdump_2018_17360,
"strip-2017-7303": self.check_strip_2017_7303,
"nm-2017-14940": self.check_nm_2017_14940,
"readelf-2017-16828": self.check_readelf_2017_16828,
"xmllint-2017-5969": self.check_xmllint_2017_5969,
"xmllint-2017-9047": self.check_xmllint_2017_9047,
"xmllint-2017-9048": self.check_xmllint_2017_9048,
"cjpeg-1.5.90-2018-14498": self.check_cjpeg_2018_14498,
"cjpeg-2.0.4-2020-13790": self.check_cjpeg_2020_13790,
}
self.buf = cve_asan_report
self.tlog = tlog
def get_supported_cve_list(self) -> list:
return self.cve_func_map.keys()
def extract_cve_type(self) -> str:
match = re.search(r"AddressSanitizer:\s*(.*?)\s*on", self.buf)
if match is None:
return "unknow"
start_idx, end_idx = match.span()
line = self.buf[start_idx:end_idx]
return line.split()[1]
def check_by_cve_str(self, cve_str: str):
if cve_str not in self.cve_func_map:
# 如果不存在这个cve,则直接返回错
return False
cve_func = self.cve_func_map[cve_str]
return cve_func()
def warn(self, msg):
self.tlog.warning(f"{msg}, Check the following replay log: \n {self.buf}", "all")
# Obtain the function where the crash had occurred.
def get_crash_func(self):
pattern = r"#[0-9]* 0x[0-9a-f]+ in .*"
matches = re.findall(pattern, self.buf)
if len(matches) == 0:
return ""
for match in matches:
if "asan" not in match:
return match.split()[3]
# Get the direct caller of the function that crashed.
def get_crash_func_caller(self):
pattern = r"#[0-9]* 0x[0-9a-f]+ in .*"
matches = re.findall(pattern, self.buf)
if len(matches) == 0:
return ""
find_first_flag = False
for match in matches:
if "asan" not in match:
if find_first_flag:
return match.split()[3]
find_first_flag = True
def get_crash_point(self) -> str:
pattern = r"#[0-9]* 0x[0-9a-f]+ in .*"
matches = re.findall(pattern, self.buf)
if len(matches) == 0:
return ""
for match in matches:
if "asan" not in match:
return match.split()[-1].split("/")[-1]
def get_crash_stack(self) -> list:
start_collect = False
splited_buf = self.buf.split("\n")
stack_info_list = []
res_stack_info = []
for line in splited_buf:
line = line.strip()
if line.startswith("#"):
start_collect = True
if line == '' and start_collect:
break
if start_collect:
if line != '' and "asan" not in line:
pattern = r"#[0-9]* 0x[0-9a-f]+ in .*:[0-9]+:[0-9]"
match_obj = re.match(pattern, line)
if match_obj:
stack_info_list.append(line)
for stack_info in stack_info_list:
stack_file_line_info = stack_info.split()[-1].split("/")[-1]
stack_file_line_info_split = stack_file_line_info.split(":")
res_stack_info.append({
"file": stack_file_line_info_split[0],
"begin": stack_file_line_info_split[1],
"end": stack_file_line_info_split[1],
"cover_num": 1
})
return res_stack_info
# Helper function for for-all check.
def check_all(self, checklist):
for str_to_check in checklist:
if str_to_check not in self.buf:
return False
return True
# Helper function for if-any check.
def check_any(self, checklist):
for str_to_check in checklist:
if str_to_check in self.buf:
return True
return False
def check_cxxfilt_2016_4487(self):
if self.get_crash_func() == "register_Btype":
if "cplus-dem.c:4319" in self.buf:
return True
else:
self.warn("Unexpected crash point in register_Btype()", self.buf)
return False
else:
return False
def check_cxxfilt_2016_4489(self):
# Checking for "string_appendn" can be loose, since it has many call-sites.
# Therefore, check for the specific call-site in gnu_special().
return self.check_all(["cplus-dem.c:3007"])
def check_cxxfilt_2016_4490(self):
if self.get_crash_func() == "d_unqualified_name":
if "cp-demangle.c:1596" in self.buf or "cp-demangle.c:1597" in self.buf:
return True
elif "cp-demangle.c:1576" in self.buf:
# Although crash point is slightly different, has the same root
# cause (integer overflow in d_source_name).
return True
else:
self.warn("Unexpected crash point in d_unqualified_name()", self.buf)
return False
else:
return False
def check_cxxfilt_2016_4491(self):
return self.check_all(["stack-overflow", "d_print_comp", "d_print_mod", "d_print_array_type", "d_print_comp_inner", "d_print_mod_list"])
def check_cxxfilt_2016_4492(self):
if "stack-overflow" in self.buf:
return False
if self.get_crash_func() == "do_type":
if self.check_any(["cplus-dem.c:3606", "cplus-dem.c:3781"]):
# typevec[] accessing points.
return True
# If do_type()'s line num is gone, rely on the callsite's line num.
elif "cplus-dem.c:4231" in self.buf:
# do_arg() -> do_type()
return True
elif "cplus-dem.c:1548" in self.buf or "cplus-dem.c:1595" in self.buf:
# iterate_demangle_function() -> demangle_signature() -> do_type()
return True
else:
self.warn("Unexpected crash point in do_type")
return False
else:
return False
def check_cxxfilt_2016_6131(self):
if self.check_all(["stack-overflow", "do_type"]):
if self.check_all(["demangle_arm_hp_template", "demangle_class_name", "demangle_fund_type"]):
self.warn("Unexpected crash point in do_type")
return True
return False
def check_swftophp_2016_9827(self):
if "heap-buffer-overflow" in self.buf:
if "outputscript.c:1687:" in self.buf:
return True
return False
def check_swftophp_2016_9829(self):
if "heap-buffer-overflow" in self.buf:
if "parser.c:1656:" in self.buf:
return True
return False
def check_swftophp_2016_9831(self):
if "heap-buffer-overflow" in self.buf:
# Any BOF that occurs in line 66~69 corresponds to this CVE.
if re.search(r"parser.c:6[6-9]:", self.buf) is not None:
return True
return False
def check_swftophp_2017_9988(self):
if "SEGV" in self.buf:
if "parser.c:2995:" in self.buf:
return True
return False
def check_swftophp_2017_11728(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:868" in self.buf:
if self.get_crash_func_caller() == "decompileSETMEMBER":
return True
return False
def check_swftophp_2017_11729(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:868" in self.buf:
if self.get_crash_func_caller() == "decompileINCR_DECR":
return True
return False
def check_swftophp_2017_7578(self):
if "heap-buffer-overflow" in self.buf:
# Any BOF that occurs in line 68~71 corresponds to this CVE.
if re.search(r"parser.c:(68|69|70|71):", self.buf) is not None:
return True
return False
def check_swftophp_2018_7868(self):
# We should exclude SEGV because it's issue-122 (NULL dereference). Also,
# exclude UAF because it's likely CVE-2018-8962.
if "heap-buffer-overflow" in self.buf:
if self.check_all(["getString", "sprintf"]):
# If these are observed, it's likely CVE-2018-7873 or CVE-2018-7867.
return False
elif "decompile.c:398" in self.buf:
return True
elif "decompile.c:408" in self.buf:
# This is CVE-2018-7871.
return False
elif "getName" in self.buf:
self.warn("Unexpected heap BOF within getName")
return False
def check_swftophp_2018_8807(self):
if "heap-use-after-free" in self.buf:
# Consider the crash at "decompile.c:398" as the same CVE (referred to
# the various stack traces in CVE-2018-8962).
if self.check_any(["decompile.c:349", "decompile.c:398"]):
if self.get_crash_func_caller() == "decompileCALLFUNCTION":
return True
# Crash also occurs at the caller itself. Conservatively say no.
return False
def check_swftophp_2018_8962(self):
possible_callers = ["decompileGETVARIABLE",
"decompileSingleArgBuiltInFunctionCall",
"decompilePUSHPARAM",
"decompileDELETE",
"decompileSETTARGET",
"decompileSUBSTRING",
"decompileNEWOBJECT"]
if "heap-use-after-free" in self.buf:
if self.check_any(["decompile.c:349", "decompile.c:398"]):
if self.get_crash_func_caller() in possible_callers:
return True
# Crash also occurs at the caller itself. Conservatively say no.
return False
def check_swftophp_2018_11095(self):
# Accept both SEGV and BOF (cf. GitHub report and our PoC replay)
if self.check_any(["heap-buffer-overflow", "SEGV"]):
if "decompile.c:1843:" in self.buf:
return True
elif self.get_crash_func() == "decompileJUMP":
self.warn("Unexpected crash point in decompileJUMP")
return False
def check_swftophp_2018_11225(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:2015:" in self.buf:
return True
elif self.get_crash_func() == "decompile_SWITCH":
self.warn("Unexpected crash point in decompile_SWITCH")
return False
def check_swftophp_2018_11226(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:2015:" in self.buf:
return True
elif self.get_crash_func() == "decompile_SWITCH":
self.warn("Unexpected crash point in decompile_SWITCH")
return False
def check_swftophp_2020_6628(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:2015:" in self.buf:
return True
elif self.get_crash_func() == "decompile_SWITCH":
self.warn("Unexpected crash point in decompile_SWITCH")
return False
def check_swftophp_2018_20427(self):
if "SEGV" in self.buf:
if "decompile.c:425:" in self.buf:
return True
return False
def check_swftophp_2019_12982(self):
if "heap-buffer-overflow" in self.buf:
if "decompile.c:3120:" in self.buf:
return True
return False
def check_swftophp_2019_9114(self):
if "heap-buffer-overflow" in self.buf:
# Possible crash points in strcpyext (all corresponds to this CVE).
if re.search(r"decompile.c:2(54|56|59|61):", self.buf) is not None:
if self.get_crash_func_caller() == "getName":
return True
else:
self.warn("Unexpected caller of strcpyext")
return False
def check_lrzip_2017_8846(self):
if "heap-use-after-free" in self.buf:
if "stream.c:1747" in self.buf:
# Following is the function modified in the final patch
if "unzip_match" in self.buf:
return True
else:
self.warn("Unexpected stack trace")
elif self.get_crash_func() == "read_stream" :
self.warn("Unexpected crash point in read_stream")
return False
def check_lrzip_2018_11496(self):
if "heap-use-after-free" in self.buf:
if "stream.c:1756" in self.buf:
# Not sure about this caller. Conservatively say no.
if "read_u32" in self.buf:
return False
elif "read_header" in self.buf:
return True
else:
self.warn("Unexpected stack trace")
elif self.get_crash_func() == "read_stream" :
self.warn("Unexpected crash point in read_stream")
return False
def check_objdump_2017_8392(self):
if "heap-buffer-overflow" in self.buf:
if "read_4_bytes" in self.buf:
return True
return False
def check_objdump_2017_8396(self):
if "heap-buffer-overflow" in self.buf:
if self.get_crash_func() == "bfd_getl64":
return True
return False
def check_objdump_2017_8397(self):
if "heap-buffer-overflow" in self.buf:
if self.get_crash_func() == "bfd_perform_relocation":
return True
return False
def check_objdump_2017_8398(self):
if "heap-buffer-overflow" in self.buf:
if "process_extended_line_op" in self.buf:
return True
return False
def check_objdump_2018_17360(self):
if "heap-buffer-overflow" in self.buf:
if "pe_print_edata" in self.buf:
return True
return False
def check_objcopy_2017_8393(self):
if "global-buffer-overflow" in self.buf:
if "_bfd_elf_get_reloc_section" in self.buf:
return True
return False
def check_objcopy_2017_8394(self):
if "SEGV" in self.buf:
if self.get_crash_func() == "filter_symbols":
return True
return False
def check_objcopy_2017_8395(self):
if "SEGV" in self.buf:
if "cache_bread_1" in self.buf:
return True
return False
def check_nm_2017_14940(self):
if "Exit value is 137" in self.buf:
starts = self.buf.count('@@@ start')
ends = self.buf.count('@@@ end')
if starts > 0 and starts > ends :
return True
return False
def check_readelf_2017_16828(self):
if "heap-buffer-overflow" in self.buf:
if "display_debug_frames" in self.buf:
return True
return False
def check_strip_2017_7303(self):
if "SEGV" in self.buf:
if "find_link" in self.buf:
return True
return False
def check_xmllint_2017_5969(self):
if "SEGV" in self.buf:
if "valid.c:1181:" in self.buf:
return True
return False
def check_xmllint_2017_9047(self):
if "stack-buffer" in self.buf: # Both over- and under-flow.
if "valid.c:1279:" in self.buf:
return True
return False
def check_xmllint_2017_9048(self):
if "stack-buffer" in self.buf: # Both over- and under-flow.
if "valid.c:1323:" in self.buf:
return True
return False
def check_cjpeg_2018_14498(self):
if "heap-buffer-overflow" in self.buf:
if "rdbmp.c:209:" in self.buf:
return True
elif self.get_crash_func() == "get_8bit_row":
self.warn("Unexpected crash point in get_8bit_row")
return False
def check_cjpeg_2020_13790(self):
if "heap-buffer-overflow" in self.buf:
if "rdppm.c:434:" in self.buf:
return True
elif self.get_crash_func() == "get_rgb_row":
self.warn("Unexpected crash point in get_rgb_row")
return False
def check_TODO(self):
print("TODO: implement triage logic")
return False
class Case:
"""
Case 实体
记录一个种子可能包含的信息,并提供相应的getter和setter
"""
def __init__(self, tlog: TaskLogger):
# 文件名
self.file_name = ""
# 摘要,确定种子唯一性
self.sha256 = ""
# 种子存放的地址
self.file_path = ""
# html报告index.html存放绝对路径
self.html_path = ""
# html报告index.html存放相对路径
self.html_relative_path = ""
# text报告index.txt的绝对路径
self.text_path = ""
# text报告相对路径
self.text_relative_path = ""
# 种子目录
self.case_dir = ""
# 函数覆盖率
self.function_coverage = 0
# 行覆盖率
self.line_coverage = 0
# 区域覆盖率
self.region_coverage = 0
# 是否触发crash
self.is_crash = False
# 覆盖到目标的数目
self.target_coverage = []
# 该case被检测到的时间
self.create_time = 0
# 如果CaseBase中存在第一个case,则为第一个case生成的时间戳,否则为CaseBase初始化的时间戳
self.case_base_init_time = 0
# 该case的距离
self.distance = -1.0
# 该case的min-max缩放后的距离
self.min_max_scaled_distance = -1.0
# 该case的分数
self.perf_score = -1
# 该case执行时候的输出内容
self.stderr = ""
# case错误输出的文件路径
self.stderr_path = ""
# case relative path
self.stderr_relative_path = ""
# record if the case is from crashes directory
self.from_crash_dir = False
# 是否允许被抛弃
self.is_abandonable = True
# 是否是真crash
self.is_true_crash = False
# crash的类型
self.cve_type = ""
# crash的函数
self.cve_crash_function = ""
# 调用crash函数的函数
self.cve_crash_function_caller = ""
# crash触发点,文件名:行数:列数
self.cve_crash_point = ""
# crash调用栈
self.crash_stack = []
# 是否按照调用栈到达
self.is_fit_target_stack = False
# 是否到达
self.is_arrived = False
self.tlog = tlog
# 根据种子文件的源路径初始化该种子,
# 初始化包括填写种子的属性,将种子文件复制到对应的文件中
def init_from_origin_path(
self, orig_path: str, target_path: str, queue_first_case_create_time: float, from_crash_dir=False, is_abandonable=True
):
self.create_time = os.path.getctime(orig_path)
with open(orig_path, "rb") as f:
self.sha256 = hashlib.new("sha256", f.read()).hexdigest()
self.case_dir = os.path.join(target_path, self.sha256)
Util.mkdir(self.case_dir)
self.file_name = orig_path[orig_path.replace("\\\\", "/").rindex("/") + 1 :]
self.file_path = os.path.join(self.case_dir, self.file_name)
shutil.copyfile(orig_path, self.file_path)
self.html_path = os.path.join(self.case_dir, "html")
self.text_path = os.path.join(self.case_dir, "text")
# self.html_relative_path = f"./reports/{self.html_path[len(target_path):]}"
# self.text_relative_path = f"./reports/{self.text_path[len(target_path):]}"
self.html_relative_path = f"./reports/{self.sha256}/html"
self.text_relative_path = f"./reports/{self.sha256}/text"
Util.mkdir(self.html_path)
Util.mkdir(self.text_path)
self.stderr_path = f"{self.case_dir}/stderr.txt"
# self.stderr_relative_path = f"./reports/{self.stderr_path[len(target_path):]}"
self.stderr_relative_path = f"./reports/{self.sha256}/stderr.txt"
self.from_crash_dir = from_crash_dir
self.is_abandonable = is_abandonable
def to_string(self) -> str:
return f"""
--------------Case---------------------
case_dir: {self.case_dir}
file_name: {self.file_name}
file_path: {self.file_path}
html_path: {self.html_path}
html_relative:path: {self.html_relative_path}
text_path: {self.text_path}
text_relative_path: {self.text_relative_path}
sha256: {self.sha256}
function_coverage: {self.function_coverage}
line_coverage: {self.line_coverage}
region_coverage: {self.region_coverage}
target_coverage: {self.target_coverage}
create_time: {datetime.datetime.fromtimestamp(self.create_time).strftime('%Y-%m-%d %H:%M:%S')}
distance: {self.distance}
min_max_scaled_distance: {self.min_max_scaled_distance}
perf_score: {self.perf_score}
is_crash: {self.is_crash}
stderr: {self.stderr}
stderr_path: {self.stderr_path}
stderr_relative_path: {self.stderr_relative_path}
from_crash_dir: {self.from_crash_dir}
case_base_init_time: {self.case_base_init_time}
is_true_crash: {self.is_true_crash}
cve_type: {self.cve_type}
cve_crash_function: {self.cve_crash_function}
cve_crash_function_caller: {self.cve_crash_function_caller}
cve_crash_point: {self.cve_crash_point}
crash_stack: {self.crash_stack}
is_fit_target_stack: {self.is_fit_target_stack}
is_arrived: {self.is_arrived}
---------------------------------------
"""
def to_dict(self) -> dict:
return {
"file_name": self.file_name,
"sha256": self.sha256,
"file_path": self.file_path,
"html_path": self.html_path,
"html_relative_path": self.html_relative_path,
"text_path": self.text_path,
"case_dir": self.case_dir,
"function_coverage": self.function_coverage,
"line_coverage": self.line_coverage,
"region_coverage": self.region_coverage,
"is_crash": self.is_crash,
"target_coverage": self.target_coverage,
"create_time": self.create_time,
"distance": self.distance,
"min_max_scaled_distance": self.min_max_scaled_distance,
"perf_score": self.perf_score,
"text_relative_path": self.text_relative_path,
"stderr": self.stderr,
"stderr_path": self.stderr_path,
"stderr_relative_path": self.stderr_relative_path,
"from_crash_dir": self.from_crash_dir,
"case_base_init_time": self.case_base_init_time,
"is_true_crash": self.is_true_crash,
"cve_type": self.cve_type,
"cve_crash_function": self.cve_crash_function,
"cve_crash_function_caller": self.cve_crash_function_caller,
"cve_crash_point": self.cve_crash_point,
"crash_stack": self.crash_stack,
"is_fit_target_stack": self.is_fit_target_stack,
"is_arrived": self.is_arrived,
}
def load_from_dict(self, obj: dict):
if "file_name" in obj:
self.file_name = obj["file_name"]
if "sha256" in obj:
self.sha256 = obj["sha256"]
if "file_path" in obj:
self.file_path = obj["file_path"]
if "html_path" in obj:
self.html_path = obj["html_path"]
if "html_relative_path" in obj:
self.html_relative_path = obj["html_relative_path"]
if "text_path" in obj:
self.text_path = obj["text_path"]
if "case_dir" in obj:
self.case_dir = obj["case_dir"]
if "function_coverage" in obj:
self.function_coverage = obj["function_coverage"]
if "line_coverage" in obj:
self.line_coverage = obj["line_coverage"]
if "region_coverage" in obj:
self.region_coverage = obj["region_coverage"]
if "is_crash" in obj:
self.is_crash = obj["is_crash"]
if "target_coverage" in obj:
self.target_coverage = obj["target_coverage"]
if "create_time" in obj:
self.create_time = obj["create_time"]
if "distance" in obj:
self.distance = obj["distance"]
if "min_max_scaled_distance" in obj:
self.min_max_scaled_distance = obj["min_max_scaled_distance"]
if "perf_score" in obj:
self.perf_score = obj["perf_score"]
if "text_relative_path" in obj:
self.text_relative_path = obj["text_relative_path"]
if "stderr" in obj:
self.stderr = obj["stderr"]
if "stderr_path" in obj:
self.stderr_path = obj["stderr_path"]
if "stderr_relative_path" in obj:
self.stderr_relative_path = obj["stderr_relative_path"]
if "from_crash_dir" in obj:
self.from_crash_dir = obj["from_crash_dir"]
if "case_base_init_time" in obj:
self.case_base_init_time = obj["case_base_init_time"]
if "is_true_crash" in obj:
self.is_true_crash = obj["is_true_crash"]
if "cve_type" in obj:
self.cve_type = obj["cve_type"]
if "cve_crash_function" in obj:
self.cve_crash_function = obj["cve_crash_function"]
if "cve_crash_function_caller" in obj:
self.cve_crash_function_caller = obj["cve_crash_function_caller"]
if "cve_crash_point" in obj:
self.cve_crash_point = obj["cve_crash_point"]
if "crash_stack" in obj:
self.crash_stack = obj["crash_stack"]
if "is_fit_target_stack" in obj:
self.is_fit_target_stack = obj["is_fit_target_stack"]
if "is_arrived" in obj:
self.is_arrived = obj["is_arrived"]
def get_coverage_number_accumlate(self, target_coverage_list: list) -> int:
cnt = 0
if self.is_crash:
reversed_target_coverage_list = target_coverage_list[::-1]
reversed_crash_stack_list = self.crash_stack[::-1]
min_length = min(len(reversed_crash_stack_list), len(reversed_target_coverage_list))
for i in range(min_length):
cur_crash_info = reversed_crash_stack_list[i]
cur_target_info = reversed_target_coverage_list[i]
if (cur_crash_info["file"] == cur_target_info["file"]
and cur_crash_info["begin"] <= cur_target_info["begin"]
and cur_crash_info["end"] >= cur_target_info["end"]
and cur_crash_info["cover_num"] > 0):
cnt += 1
else:
break
else:
for item in reversed(self.target_coverage):
if item["cover_num"] > 0:
cnt += 1
else:
break
return cnt
def remove(self):
if os.path.exists(self.case_dir):
self.tlog.debug(f"种子{self.case_dir}将被删除", "zh")
shutil.rmtree(self.case_dir, ignore_errors=True)
class ExecutableBinary:
"""
ExecutableBinary 类
编译后的可执行文件实体类
包含了可执行文件的地址和执行命令行参数
"""
def __init__(self, binary_path: str, binary_args: str, tlog: TaskLogger, stdin='file'):
self.tlog = tlog
self.stdin = stdin
self.tlog.debug("开始初始化ExecutableBinary实例", "zh")
# 二进制文件名
self.binary_path = binary_path
self.tlog.debug(f"binary_path = {self.binary_path}", "zh")
# 二进制文件命令行参数
self.binary_execute_args = binary_args.replace("@@", " %s ")
self.tlog.debug(f"binary_execute_args = {self.binary_execute_args}", "zh")
def get_binary_path(self) -> str:
return self.binary_path
def get_binary_execute_args(self, file_path: str):
if "%s" in self.binary_execute_args:
return self.binary_execute_args % (file_path)
return self.binary_execute_args
"""
CmdExecutor 命令执行器
用于执行shell命令
"""
class CmdExecutor:
def __init__(self, executable_binary: ExecutableBinary, tlog: TaskLogger):
self.tlog = tlog
self.tlog.debug("开始初始化CmdExecutor实例", "zh")
self.executable_binary = executable_binary
def execute(
self,
cmd: str,
stdout: TextIOWrapper = subprocess.PIPE,
stderr: TextIOWrapper = subprocess.PIPE,
stdin="file",
stdin_file_path=""
) -> CompletedProcess:
self.tlog.debug(f"正在执行: {cmd}", "zh")
if stdin == 'file':
return subprocess.run(cmd, stdout=stdout, stderr=stderr, shell=True)
elif stdin == 'stdin':
with open(stdin_file_path, "r") as stdin_fd:
return subprocess.run(cmd, stdin=stdin_fd, stdout=stdout, stderr=stderr, shell=True)
else:
self.tlog.error(f"未识别的stdin {stdin}")
"""
Html报告生成器
Analyst可以用来生成单个case的html报告,也可以根据所有case生成一个总的html报告,方便阅读使用
"""
class Analyst:
def __init__(
self,
output_dir: str,
cmd_executor: CmdExecutor,
cmd_executor_crash: CmdExecutor,
mode: str,
output_format: str,
report_root_dir: str,
tlog: TaskLogger
):
self.tlog = tlog
self.tlog.debug("开始初始化Analyst实例", "zh")
self.output_dir = output_dir
self.case_html_reports_dir = os.path.join(self.output_dir, "reports")
self.mode = mode
if self.mode in ["all", "collect"]:
self.init_output_dir()
self.cmd_executor = cmd_executor
self.cmd_executor_crash = cmd_executor_crash
self.output_format = output_format
self.report_root_dir = report_root_dir.replace("\\", "/")
# 生成单个case的asan报告
def generate_crash_asan_report(self, case: Case):
binary_path = self.cmd_executor_crash.executable_binary.get_binary_path()
binary_execute_args = (
self.cmd_executor_crash.executable_binary.get_binary_execute_args(
case.file_path
)
)
stdin = self.cmd_executor_crash.executable_binary.stdin
return self.cmd_executor_crash.execute(
f" {binary_path} {binary_execute_args} ",
stderr=subprocess.PIPE,
stdin=stdin,
stdin_file_path=case.file_path
)
# 生成单个case的profraw和profdata
def generate_single_profraw_profdata(self, case: Case):
binary_path = self.cmd_executor.executable_binary.get_binary_path()
stdin = self.cmd_executor.executable_binary.stdin
binary_execute_args = (
self.cmd_executor.executable_binary.get_binary_execute_args(case.file_path)
)
profraw_file_path = f"{case.case_dir}/default.profraw"
profdata_file_path = f"{case.case_dir}/default.profdata"
self.cmd_executor.execute(
f'LLVM_PROFILE_FILE="{profraw_file_path}" {binary_path} {binary_execute_args}',
stdin=stdin,
stdin_file_path=case.file_path
)
self.cmd_executor.execute(
f"llvm-profdata merge -sparse {profraw_file_path} -o {profdata_file_path}"
)
# 生成单个case的html报告
def generate_single_case_html(self, case: Case) -> CompletedProcess:
binary_path = self.cmd_executor.executable_binary.get_binary_path()
profdata_file_path = f"{case.case_dir}/default.profdata"
return self.cmd_executor.execute(
f"llvm-cov show {binary_path} -instr-profile={profdata_file_path} -format=html -output-dir={case.html_path}",
)
# 生成单个case的text报告, 必须要在
def generate_single_case_text(self, case: Case) -> CompletedProcess:
binary_path = self.cmd_executor.executable_binary.get_binary_path()
profdata_file_path = f"{case.case_dir}/default.profdata"
return self.cmd_executor.execute(
f"llvm-cov show {binary_path} -instr-profile={profdata_file_path} -format=text -output-dir={case.text_path}",
)
# 初始化输出目录
def init_output_dir(self):
Util.mkdir(self.output_dir, "overwrite")
Util.mkdir(self.case_html_reports_dir, "overwrite")
def get_coverage_by_text_path(self, text_file_path: str):
result = []
try:
with open(text_file_path, "r") as f:
txt_lines = f.readlines()
target_line = txt_lines[-3]
splited_line = target_line.split(" ")
trimed_line = [
item
for item in splited_line
if not (item == "" or item.find("%") == -1)
]
for line in trimed_line:
result.append(float(line[: line.find("%")]))
except Exception as e:
self.tlog.error("get_coverage_by_text_path::", "zh", stack_info=True)
self.tlog.error(e)
result = [0.0, 0.0, 0.0]
finally:
return result
def get_back_dir(self):
split_str = self.report_root_dir.split("/")
processed_split_str = []
for item in split_str:
if item == "..":
raise Exception(
# f"Invalid report root path : {self.report_root_dir}, the path can not contain '..' "
f"报告根目录错误(invalid report root path) : {self.report_root_dir}, 路径中不能包含'..' "
)
if item != "" and not item.isspace() and item != ".":
processed_split_str.append(item)
return "../" * len(processed_split_str)
def case_coverage_compare(case_a: Case, case_b: Case) -> int:
"""
target_coverage = [{
"file": "valid.c",
"begin": 100,
"end": 101,
"cover_num": 45
},]
"""
len_a = len(case_a.target_coverage)
len_b = len(case_b.target_coverage)
if len_a > len_b:
# 如果a的长度更长,说明a覆盖到更多的target,则a就排在b前面
return -1
elif len_a < len_b:
# 反之b排在a前面
return 1
else:
# 如果二者长度相同,则说明覆盖到了一样多的target,此时就要从后往前比较两者的cover_num
for i in range(len_a - 1, -1, -1):
cover_num_a = case_a.target_coverage[i]["cover_num"]
cover_num_b = case_b.target_coverage[i]["cover_num"]
if cover_num_a > cover_num_b:
# 如果深度较深的cover次数a比b大的话,则将a放到b之前
return -1
elif cover_num_a < cover_num_b:
return 1
return 0
class TargetCoverageChecker:
def __init__(self, target_file_path: str, tlog: TaskLogger):
self.tlog = tlog
self.target_file_path = target_file_path
# self.targets_list = [{"file":"valid.c", "begin":5000, "end": 5023}, {"file":"test.c", "begin": 100, "end": 100}]
self.targets_list = self.init_targets(self.target_file_path)
def get_coverage_total_num(self) -> int:
return len(self.targets_list)
def get_coverage_result(self, case: Case):
result = []
text_report_root_path = f"{case.case_dir}/text/coverage"
for target_file in self.targets_list:
for root, dirs, files in os.walk(text_report_root_path, topdown=True):
for _file in files:
if f"{target_file['file']}.txt" == _file:
# 找到了对应的file
single_file_path = f"{root}/{_file}"
begin_line_no = target_file["begin"]
end_line_no = target_file["end"]
cover_num = self.get_single_file_cover_num_by_text_reports(
single_file_path, begin_line_no, end_line_no
)
# 如果cover_num为-1,则说明解析出错,我们就忽略本次解析结果
if cover_num == -1:
continue
result.append(
{
"file": _file,
"begin": begin_line_no,
"end": end_line_no,
"cover_num": cover_num,
}
)
case.target_coverage = result
def init_targets(self, target_path):
result = []
try:
with open(target_path, "r") as f:
lines = f.readlines()
for idx, line in enumerate(lines):
if line == "" or line.isspace():
continue
splited_str1 = line.split(":")
if len(splited_str1) != 2:
raise Exception("target文件语法错误")
# 解析得到文件名
file_name = splited_str1[0].strip()
# 解析得到对应的行数 支持两种语法:
# valid.c: 2001
# valid.c: 3000 - 4000
target_line_number = splited_str1[1]
if target_line_number.find("-") != -1:
start_number, end_number = tuple(target_line_number.split("-"))
start_number = int(start_number)
end_number = int(end_number)
else:
# target_line_number_list.append(int(target_line_number))
start_number = int(target_line_number)
end_number = int(target_line_number)
result.append(
{"file": file_name, "begin": start_number, "end": end_number}
)
except Exception as e:
self.tlog.error(e, "zh", stack_info=True)
self.tlog.error(f"解析target文件时出现错误,出错行号: {idx + 1}", "zh", stack_info=True)
return result
def get_single_file_cover_num_by_text_reports(
self, single_file_path: str, begin_line_no: int, end_line_no: int
) -> int:
cnt = 0
try:
target_lines_list = list(range(begin_line_no, end_line_no + 1))
for line_no in target_lines_list:
line_content = linecache.getline(single_file_path, line_no + 3)
splited_lines = line_content.split("|")
cover_num = splited_lines[1].strip()
if cover_num.isdigit():
cnt += int(cover_num)
except Exception as e:
self.tlog.warning(f"解析llvm-cov生成的文件 {single_file_path} 失败!忽略该文件解析结果")
cnt = -1
return cnt
# 检查是否按照调用栈到达
def check_if_stack_fit(self, case: Case) -> bool:
if case.is_crash:
stack_len = min(len(self.targets_list), len(case.crash_stack))
for i in range(stack_len):
target_stack_info = self.targets_list[i]
cur_stack_info = case.crash_stack[i]
if cur_stack_info["file"] != target_stack_info["file"]:
return False
if int(cur_stack_info["begin"]) < int(target_stack_info["begin"]):
return False
if int(cur_stack_info["begin"]) > int(target_stack_info["end"]):
return False
else:
for cov_info in case.target_coverage:
if cov_info["cover_num"] <= 0:
return False
return True
# 检查是否已到达
def check_if_is_arrived(self, case: Case) -> bool:
if case.is_crash:
if len(self.targets_list) == 0 or len(case.crash_stack) == 0:
# 如何两个长度为0,则默认未到达
return False
target_stack_info = self.targets_list[0]
cur_stack_info = case.crash_stack[0]
if cur_stack_info["file"] != target_stack_info["file"]:
return False
if int(cur_stack_info["begin"]) < int(target_stack_info["begin"]):
return False
if int(cur_stack_info["begin"]) > int(target_stack_info["end"]):
return False
else:
if case.target_coverage[0]["cover_num"] <= 0:
return False
return True
class CaseBase:
"""
种子库
种子库里会存放种子的text内容,并使用llvm-cov生成种子对应的html报告
"""
def __init__(
self,
analyst: Analyst,
cve_str: str,
fuzz_task,
tlog: TaskLogger,
target_file_path=None
):
self.tlog = tlog
self.tlog.debug("开始初始化CaseBase实例", "zh")
# FuzzTask传来的回调函数,我们将分析得出的结果通过回调函数传回FuzzTask,用于更新信息
self.fuzz_task = fuzz_task
self.case_list: List[Case] = []
self.analyst = analyst
self.wait_queue: queue.Queue = queue.Queue()
self.case_list_lock = threading.Lock()
self.init_time = time.time()
self.cve_str = cve_str
self.target_file_path = target_file_path
self.target_coverage_checker = None
if self.target_file_path is not None:
self.target_coverage_checker = TargetCoverageChecker(self.target_file_path, self.tlog)
self.tlog.debug("初始化CaseBase实例已完成", "zh")
# 检查种子是否已经存在,及是否已经添加到了success和wait队列里
def __check_if_case_is_unique(self, case: Case) -> bool:
for casei in self.case_list:
if casei.sha256 == case.sha256:
return False
return True
# 向CaseBase添加一个种子,需要互斥地进行
def add_case(self, case: Case):
# 打印当前线程的信息
self.tlog.debug(f"[Thread] 当前线程: {threading.current_thread().name}", "zh")
# 直接向wait队列中加入该case
self.tlog.debug(
f"[Thread] 线程: {threading.current_thread().name} 已加入到等待队列", "zh"
)
self.wait_queue.put(case)
def get_queue_count(self) -> int:
return self.wait_queue.qsize()
# 处理单个case
# 得到分析报告,可以并行执行,但是线程个数需要有所控制
def process_single_case(self, case: Case):
html_report_execute_result = 0
text_report_execute_result = 0
try:
self.case_list_lock.acquire()
self.tlog.debug("以下种子被接收", "zh")
self.tlog.debug(case.to_string(), "zh")
# 判断一下该种子是否存在
case_is_unique = False
if self.__check_if_case_is_unique(case):
case_is_unique = True
else:
case_is_unique = False
self.tlog.warning(f"以下种子是重复的 : \n {case.to_string()}", "zh")
if not case_is_unique:
# 如果种子是重复的,就直接退出
return
if case_is_unique and not case.from_crash_dir:
# 如果种子是唯一的才进行处理,并且不是触发crash的种子才进行处理
# 因为触发crash的种子使用llvm-cov无法得出结果
self.tlog.debug("开始生成单个种子的profraw 和 profdata文件", "zh")
self.analyst.generate_single_profraw_profdata(case)
self.tlog.debug("开始生成单个种子的text报告", "zh")
text_report_execute_result = (
self.analyst.generate_single_case_text(case)
)
case_add_to_queue_condition = False
if not case.from_crash_dir:
if(
text_report_execute_result.returncode == 0
and os.path.exists(f"{case.text_path}/index.txt")
):
case_add_to_queue_condition = True
# 检查一下是否成功运行生成html报告的指令
if case_add_to_queue_condition:
# 成功生成了llvm-cov报告,可以进一步处理
if self.target_coverage_checker is not None:
# 如果指定了target_coverage_file就解析一下目标调用栈覆盖情况
self.target_coverage_checker.get_coverage_result(case)
case.is_fit_target_stack = self.target_coverage_checker.check_if_stack_fit(case)
case.is_arrived = self.target_coverage_checker.check_if_is_arrived(case)
# 处理完之后将该case加入到success队列中
self.case_list.append(case)
self.tlog.info(
Printer().to_green(
f"当前已收集到的种子个数为 {len(self.case_list)}"
),
"zh",
)
else:
case.is_crash = True
# 执行asan,获取asan报告
crash_execute_result = self.analyst.generate_crash_asan_report(case)
case.stderr = crash_execute_result.stderr.decode("utf-8", errors="replace")
# 解析asan报告,获取crash信息
cve_checker = CVEChecker(case.stderr)
case.cve_type = cve_checker.extract_cve_type()
case.cve_crash_function = cve_checker.get_crash_func()
case.cve_crash_function_caller = cve_checker.get_crash_func_caller()
case.cve_crash_point = cve_checker.get_crash_point()
case.crash_stack = cve_checker.get_crash_stack()
if self.target_coverage_checker is not None:
case.is_fit_target_stack = self.target_coverage_checker.check_if_stack_fit(case)
case.is_arrived = self.target_coverage_checker.check_if_is_arrived(case)
if self.cve_str is not None:
case.is_true_crash = cve_checker.check_by_cve_str(self.cve_str)
self.case_list.append(case)
self.tlog.info(
Printer().to_yellow(
f"当前已收集到的种子个数为 {len(self.case_list)} [Crash]"
),
"zh",
)
self.tlog.debug(f"{case.to_string()}", "all")
self.statistic_case_list_and_publish()
except Exception as e:
self.tlog.error(e, "all", stack_info=True)
finally:
self.case_list_lock.release()
def statistic_case_list_and_publish(self):
# 对种子列表根据文件创建进行排序
self.case_list.sort(key=lambda x: x.create_time)
# 获取上一次找到触发crash种子的时间,个数
last_crash_time = None
last_true_crash_time = None
true_crash_num = 0
crash_num = 0
max_target_depth_num = 0
first_arrive_time = None
first_crash_time = None
first_true_crash_time = None
first_arrive_file_name = "None"
first_crash_file_name = "None"
first_true_crash_file_name = "None"
first_fit_stack_arrive_time = None
first_fit_stack_arrive_file_name = "None"
first_not_fit_stack_arrive_time = None
first_not_fit_stack_arrive_file_name = "None"
for case in self.case_list:
if first_arrive_time is None and case.is_arrived:
first_arrive_time = case.create_time
first_arrive_file_name = case.file_name
if first_fit_stack_arrive_time is None and case.is_arrived and case.is_fit_target_stack:
first_fit_stack_arrive_time = case.create_time
first_fit_stack_arrive_file_name = case.file_name
if first_not_fit_stack_arrive_time is None and case.is_arrived and not case.is_fit_target_stack:
first_not_fit_stack_arrive_time = case.create_time
first_not_fit_stack_arrive_file_name = case.file_name
# 如果是能够触发crash的种子,则统计其信息
if case.is_crash:
if first_crash_time is None:
first_crash_time = case.create_time
first_crash_file_name = case.file_name
last_crash_time = case.create_time
crash_num += 1
if case.is_true_crash:
if first_true_crash_time is None:
first_true_crash_time = case.create_time
first_true_crash_file_name = case.file_name
last_true_crash_time = case.create_time
true_crash_num += 1
# 统计覆盖信息
cur_target_coverage_num = case.get_coverage_number_accumlate(self.target_coverage_checker.targets_list)
if cur_target_coverage_num > max_target_depth_num:
max_target_depth_num = cur_target_coverage_num
max_target_depth = f"{max_target_depth_num}/{self.target_coverage_checker.get_coverage_total_num()}"
path_num = len(self.case_list)
resp = {
"last_crash_time": last_crash_time,
"last_true_crash_time": last_true_crash_time,
"path_num": path_num,
"crash_num": crash_num,
"true_crash_num": true_crash_num,
"max_target_depth": max_target_depth,
"first_arrive_time": first_arrive_time,
"first_crash_time": first_crash_time,
"first_true_crash_time": first_true_crash_time,
"first_arrive_file_name": first_arrive_file_name,
"first_crash_file_name": first_crash_file_name,
"first_true_crash_file_name": first_true_crash_file_name,
"first_fit_stack_arrive_time": first_fit_stack_arrive_time,
"first_fit_stack_arrive_file_name": first_fit_stack_arrive_file_name,
"first_not_fit_stack_arrive_time": first_not_fit_stack_arrive_time,
"first_not_fit_stack_arrive_file_name": first_not_fit_stack_arrive_file_name,
}
# 调用回调函数,将统计得到的信息传回FuzzTask
self.fuzz_task.update_and_publish_info_call_back(resp)
# wait队列的处理服务
def process_service(self):
cnt = 0
while True:
if not self.wait_queue.empty():
cnt = 0
# 若wait队列不为空,则取出一个case然后进行处理
tmp_case = self.wait_queue.get()
# 处理完一个任务后要调用一下wait_queue.task_done,通知wait_queue该任务已经完成了
# 其实这里我们并未完成该任务,只不过如果没有再取走之前调用,可能导致我们每次只能运行一个线程进行处理
self.wait_queue.task_done()
CCDLThreadPool.submit(self.process_single_case, tmp_case)
else:
cnt += 1
if cnt > 100:
time.sleep(0.1)
# 开启wait队列的处理服务
def start_process_service(self):
service = threading.Thread(target=self.process_service, daemon=True)
service.start()
def dumps(self) -> str:
result = []
self.case_list_lock.acquire()
for case in self.case_list:
result.append(case.to_dict())
self.case_list_lock.release()
return json.dumps(result, indent=4)
def loads(self, case_str: str):
case_list = json.loads(case_str)
self.case_list_lock.acquire()
for item in case_list:
case = Case(self.tlog)
case.load_from_dict(item)
self.case_list.append(case)
self.case_list_lock.release()
class CCDLEventHandler(FileSystemEventHandler):
"""
自定义Observer实现处理器
定义了一些Observer类监听到目录发生变化时的回调函数
"""
def __init__(self, case_base: CaseBase, from_crash_dir=False, tlog: TaskLogger = None):
self.tlog = tlog
self.tlog.debug("开始初始化CCDLEventHandler实例", "zh")
FileSystemEventHandler.__init__(self)
self.case_base = case_base
self.from_crash_dir = from_crash_dir
self.tlog.debug("初始化CCDLEventHandler实例已完成", "zh")
def on_moved(self, event):
if event.is_directory:
self.tlog.debug(f"监测到[目录被移动] {event.src_path} -> {event.dest_path}", "zh")
else:
self.tlog.debug(f"监测到[文件被移动] {event.src_path} -> {event.dest_path}", "zh")
def on_created(self, event):
if event.is_directory:
self.tlog.debug(f"检测到[目录被创建] {event.src_path} ", "zh")
else:
self.tlog.debug(f"检测到[文件被创建] {event.src_path} ", "zh")
new_case = Case(self.tlog)
new_case.init_from_origin_path(
event.src_path,
self.case_base.analyst.case_html_reports_dir,
self.case_base.init_time,
from_crash_dir=self.from_crash_dir,
)
if new_case.file_name != "README.txt":
self.case_base.add_case(new_case)
def on_deleted(self, event):
if event.is_directory:
self.tlog.debug(f"监测到[目录被删除] {event.src_path} ", "zh")
else:
self.tlog.debug(f"监测到[文件被删除] {event.src_path} ", "zh")
def on_modified(self, event):
if event.is_directory:
self.tlog.debug(f"监测到[目录被修改] {event.src_path} ", " zh")
else:
self.tlog.debug(f"监测到[文件被修改] {event.src_path} ", "zh")
class FuzzerExecutor:
def __init__(self, fuzzer: str, seed_path: str, output_path: str, binary_path: str, args: str, tlog: TaskLogger, log_path=None):
self.tlog = tlog
self.tlog.debug("初始化FuzzerExecutor")
self.fuzzer = fuzzer
self.seed_path = seed_path
self.output_path = output_path
self.binary_path = binary_path
self.args = args
self.cmd = None
self.process: subprocess.Popen = None
self.log_path = log_path
if fuzzer == 'DAFL':
self.cmd = f"/fuzzer/DAFL/afl-fuzz -m none -d -i {self.seed_path} -o {self.output_path} -- {self.binary_path} {self.args}"
self.tlog.info(f"FuzzerExecutor cmd = {self.cmd}")
self.tlog.debug(f"FuzzerExecutor 日志输出路径 {self.log_path}")
self.tlog.debug("初始化FuzzerExecutor结束")
def start_fuzz(self):
self.tlog.info("开始运行fuzz程序")
self.fuzzer_stdout = subprocess.PIPE
if self.log_path is not None:
self.tlog.info(f"fuzzer 输出重定向到 {self.log_path}")
self.fuzzer_stdout = open(self.log_path, "w")
else:
self.tlog.info("fuzzer不输出日志")
threading.Thread(target=self.run_fuzz_popen_thread).start()
self.tlog.info("运行fuzz程序已启动")
def run_fuzz_popen_thread(self):
os.environ.pop("ASAN_OPTIONS", None)
env = os.environ.copy()
env["AFL_NO_AFFINITY"] = "1"
env["AFL_SKIP_CRASHES"] = "1"
self.process = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=self.fuzzer_stdout,
stderr=self.fuzzer_stdout,
shell=True,
env=env,
)
def stop_fuzz(self):
self.tlog.info("开始停止fuzz程序")
if self.process is not None:
try:
self.tlog.info("向进程发送kill信号...")
# 向进程发送终止信号
self.process.kill()
self.tlog.info("等待进程结束...")
# 等待进程结束
self.process.wait()
except subprocess.TimeoutExpired:
# terminate超时,没有效果
# 选择直接使用kill强制终止
self.tlog.warning("kill信号无响应")
self.tlog.info("再次向进程发送kill信号...")
self.process.kill()
self.tlog.info("等待进程结束...")
# 再次等待进程结束
self.process.wait()
finally:
self.tlog.info("fuzzer进程已停止")
else:
self.tlog.error("fuzzer进程未运行")
class FuzzTask:
def __init__(self,
fuzzer_name: str,
task_name: str,
root_dir_path: str,
seed_path: str,
binary_path: str,
cov_instr_binary_path: str,
asan_instr_binary_path: str,
args: str,
target_file_path: str,
cve_str = None,
ideal_find_crash_secs=86400,
time_limit=86400,
mode="crash",
stdin="file",
):
self.mode = mode
self.status = FUZZ_UNKNOW
# 使用的fuzzer名称
self.fuzzer_name = fuzzer_name
# 任务名称
self.task_name = task_name
# 任务开始时间
self.startup_time = None
# 上一次找到crash时间
self.last_crash_time = None
# 上一次找到真crash时间
self.last_true_crash_time = None
# 第一次到达crash点的时间
self.first_arrive_time = None
# 第一次找到crash时间
self.first_crash_time = None
# 第一次找到真crash时间
self.first_true_crash_time = None
# 第一次按照调用栈到达时间
self.first_fit_stack_arrive_time = None
# 第一次不按照调用栈到达时间
self.first_not_fit_stack_arrive_time = None
# 路径数(queue数)
self.path_num = 0
# crash数
self.crash_num = 0
# 真crash数
self.true_crash_num = 0
# 最深的target覆盖深度
self.max_target_depth = ""
# 理想状况下找到真crash所用时间(s)
self.ideal_find_crash_secs = ideal_find_crash_secs
# 允许fuzz的最长时间
self.time_limit = time_limit
# CaseRunner缓存根目录
self.root_dir_path = root_dir_path
# fuzzer缓存根目录
self.fuzzer_cache_path = None
# fuzzer输出目录
self.fuzzer_cache_output_path = None
# fuzzer分析结果缓存目录
self.fuzzer_cache_ccdl_path = None
# fuzzer统计结果输出目录
self.fuzzer_statistic_output_path = None
# fuzzer的标准输出路径
self.fuzzer_stdout_log_path = None
# task日志输出
self.tlog: TaskLogger = None
# task日志输出文件路径
self.tlog_path: str = None
# fuzzer执行器
self.executor: FuzzerExecutor = None
# 目录监听实例
self.crash_observer = None
self.queue_observer = None
# cmd执行器
self.cmd_executor = None
self.cmd_executor_crash = None
# html结果生成器
self.analyst = None
# 种子仓库
self.case_base = None
# fuzz任务的读写锁
self.rw_lock = threading.Lock()
# 种子目录
self.seed_path = seed_path
# 待fuzz的二进制程序路径
self.binary_path = binary_path
# 插桩了llvm-cov的二进制程序路径
self.cov_instr_binary_path = cov_instr_binary_path
# 插桩了asan的二进制程序路径
self.asan_instr_binary_path = asan_instr_binary_path
# 二进制程序运行时所需要的命令行参数
self.args = args
# cve名
self.cve_str = cve_str
# 目标调用栈文件路径
self.target_file_path = target_file_path
# 第一次到达的种子名
self.first_arrive_file_name = None
# 第一次crash的种子名
self.first_crash_file_name = None
# 第一次真crash的种子名
self.first_true_crash_file_name = None
# 第一次按照调用栈到达的种子名
self.first_fit_stack_arrive_file_name = None
# 第一次不按照调用栈到达的种子名
self.first_not_fit_stack_arrive_file_name = None
# 二进制程序 stdin 输入方式
self.stdin = stdin
def get_config_dict(self):
return {
"fuzzer_name": self.fuzzer_name,
"task_name": self.task_name,
"seed_path": self.seed_path,
"binary_path": self.binary_path,
"cov_instr_binary_path": self.cov_instr_binary_path,
"asan_instr_binary_path": self.asan_instr_binary_path,
"args": self.args,
"cve_str": self.cve_str,
"target_file_path": self.target_file_path,
"time_limit": self.time_limit,
}
def dumps(self):
with open(self.fuzzer_statistic_output_detail_path, "w") as f:
output_buf = f"""
Fuzz Name : {self.fuzzer_name}
Task Name : {self.task_name}
Run Time : {Util.cal_timestamp_diff_to_str_from_now(self.startup_time)} ({Util.cal_timestamp_diff_to_seconds_from_now(self.startup_time)})
First Arrive : {Util.cal_timestamp_diff_to_str(self.startup_time, self.first_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_arrive_time)})
First Arrive Seed : {self.first_arrive_file_name}
First Crash : {Util.cal_timestamp_diff_to_str(self.startup_time, self.first_crash_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_crash_time)})
First Crash Seed : {self.first_crash_file_name}
First True Crash : {Util.cal_timestamp_diff_to_str(self.startup_time, self.first_true_crash_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_true_crash_time)})
First True Crash Seed : {self.first_true_crash_file_name}
First Fit Stack Arv Time: {Util.cal_timestamp_diff_to_str(self.startup_time, self.first_fit_stack_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_fit_stack_arrive_time)})
First Fit Stack Arv Seed: {self.first_fit_stack_arrive_file_name}
Not Fit Stack Arv Time : {Util.cal_timestamp_diff_to_str(self.startup_time, self.first_not_fit_stack_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_not_fit_stack_arrive_time)})
Not Fit Stack Arv Seed : {self.first_not_fit_stack_arrive_file_name}
Path Number : {self.path_num}
Crash Number : {self.crash_num}
True Crash Number : {self.true_crash_num}
Max Target Depth : {self.max_target_depth}
"""
f.write(output_buf)
# with open(self.fuzzer_statistic_output_case_list_path, "w") as f:
# output_buf = self.case_base.dumps()
# f.write(output_buf)
def to_dict(self):
return {
"fuzzer_name": self.fuzzer_name,
"task_name": self.task_name,
"startup_time": self.startup_time,
"startup_time_duration": f"{Util.cal_timestamp_diff_to_str_from_now(self.startup_time)} ({Util.cal_timestamp_diff_to_seconds_from_now(self.startup_time)})",
"last_crash_time": self.last_crash_time,
"last_crash_time_duration": f"{Util.cal_timestamp_diff_to_str_from_now(self.last_crash_time)} ({Util.cal_timestamp_diff_to_seconds_from_now(self.last_crash_time)})",
"last_true_crash_time": self.last_true_crash_time,
"last_true_crash_time_duration": f"{Util.cal_timestamp_diff_to_str_from_now(self.last_true_crash_time)} ({Util.cal_timestamp_diff_to_seconds_from_now(self.last_true_crash_time)})",
"first_arrive_time": self.first_arrive_time,
"first_arrive_time_duration": f"{Util.cal_timestamp_diff_to_str(self.startup_time, self.first_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_arrive_time)})",
"first_crash_time": self.first_crash_time,
"first_crash_time_duration": f"{Util.cal_timestamp_diff_to_str(self.startup_time, self.first_crash_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_crash_time)})",
"first_true_crash_time": self.first_true_crash_time,
"first_true_crash_time_duration": f"{Util.cal_timestamp_diff_to_str(self.startup_time, self.first_true_crash_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_true_crash_time)})",
"first_fit_stack_arrive_time": self.first_fit_stack_arrive_time,
"first_fit_stack_arrive_time_duration": f"{Util.cal_timestamp_diff_to_str(self.startup_time, self.first_fit_stack_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_fit_stack_arrive_time)})",
"first_not_fit_stack_arrive_time": self.first_not_fit_stack_arrive_time,
"first_not_fit_stack_arrive_time_duration": f"{Util.cal_timestamp_diff_to_str(self.startup_time, self.first_not_fit_stack_arrive_time)} ({Util.cal_timestamp_diff_to_seconds(self.startup_time, self.first_not_fit_stack_arrive_time)})",
"path_num": self.path_num,
"crash_num": self.crash_num,
"true_crash_num": self.true_crash_num,
"max_target_depth": self.max_target_depth,
"ideal_find_crash_secs": self.ideal_find_crash_secs,
"time_limit": self.time_limit,
"fuzzer_cache_output_path": self.fuzzer_cache_output_path,
"task_id": self.task_id,
"status": self.status,
"status_name": get_fuzz_status_name(self.status),
"first_arrive_file_name": self.first_arrive_file_name,
"first_crash_file_name": self.first_crash_file_name,
"first_true_crash_file_name": self.first_true_crash_file_name,
"first_fit_stack_arrive_file_name": self.first_fit_stack_arrive_file_name,
"first_not_fit_stack_arrive_file_name": self.first_not_fit_stack_arrive_file_name,
}
def update_and_publish_info_call_back(self, msg: dict):
# 让更新具有原子性
self.rw_lock.acquire()
self.last_crash_time = msg["last_crash_time"]
self.last_true_crash_time = msg["last_true_crash_time"]
self.path_num = msg["path_num"]
self.crash_num = msg["crash_num"]
self.true_crash_num = msg["true_crash_num"]
self.max_target_depth = msg["max_target_depth"]
self.first_arrive_time = msg["first_arrive_time"]
self.first_crash_time = msg["first_crash_time"]
self.first_true_crash_time = msg["first_true_crash_time"]
self.first_arrive_file_name = msg["first_arrive_file_name"]
self.first_crash_file_name = msg["first_crash_file_name"]
self.first_true_crash_file_name = msg["first_true_crash_file_name"]
self.first_fit_stack_arrive_time = msg["first_fit_stack_arrive_time"]
self.first_fit_stack_arrive_file_name = msg["first_fit_stack_arrive_file_name"]
self.first_not_fit_stack_arrive_time = msg["first_not_fit_stack_arrive_time"]
self.first_not_fit_stack_arrive_file_name = msg["first_not_fit_stack_arrive_file_name"]
self.publish()
self.rw_lock.release()
def publish(self):
self.tlog.debug(f"向主进程发送数据: {self.to_dict()}")
self.mq.publish(self.to_dict())
def init_task(
self,
task_id: int,
):
self.task_id = task_id
# 初始化fuzz目录
self.fuzzer_cache_path = f"{self.root_dir_path}/{self.task_id}"
self.fuzzer_cache_output_path = f"{self.fuzzer_cache_path}/output"
self.fuzzer_cache_ccdl_path = f"{self.fuzzer_cache_path}/ccdl"
self.fuzzer_statistic_output_path = f"{self.fuzzer_cache_path}/statistic"
self.fuzzer_statistic_output_detail_path = f"{self.fuzzer_statistic_output_path}/detail.txt"
self.fuzzer_statistic_output_case_list_path = f"{self.fuzzer_statistic_output_path}/case_list.json"
self.tlog_path = f"{self.fuzzer_cache_path}/task.log"
self.fuzzer_stdout_log_path = f"{self.fuzzer_cache_path}/fuzzer.log"
self.fuzzer_cache_output_crash_path = f"{self.fuzzer_cache_output_path}/crashes"
self.fuzzer_cache_output_queue_path = f"{self.fuzzer_cache_output_path}/queue"
Util.mkdir(self.fuzzer_cache_path, "overwrite")
Util.mkdir(self.fuzzer_cache_output_path, "overwrite")
Util.mkdir(self.fuzzer_cache_ccdl_path, "overwrite")
Util.mkdir(self.fuzzer_statistic_output_path, "overwrite")
# 初始化logger
self.tlog = TaskLogger(self.tlog_path, task_id)
self.tlog.debug("初始化logger完成")
# 初始化消息队列
self.tlog.debug("开始初始化消息队列")
self.mq = MsgQueue(task_id)
self.tlog.debug("消息队列初始化完成")
# 初始化任务状态为等待
self.tlog.debug("修改任务状态为 -> 等待态")
self.status = FUZZ_WAITTING
self.tlog.info("任务初始化完成")
# 初始化FuzzerExecutor
self.tlog.debug("开始初始化FuzzerExecutor")
self.executor = FuzzerExecutor(
self.fuzzer_name,
self.seed_path,
self.fuzzer_cache_output_path,
self.binary_path,
self.args,
self.tlog,
self.fuzzer_stdout_log_path
)
self.tlog.debug("FuzzerExecutor初始化完成")
# 初始化CmdExecutor
self.cmd_executor = CmdExecutor(ExecutableBinary(self.cov_instr_binary_path, self.args, self.tlog, self.stdin), self.tlog)
self.cmd_executor_crash = CmdExecutor(ExecutableBinary(self.asan_instr_binary_path, self.args, self.tlog, self.stdin), self.tlog)
# 初始化Analyst
self.analyst = Analyst(
output_dir=self.fuzzer_cache_ccdl_path,
cmd_executor=self.cmd_executor,
cmd_executor_crash=self.cmd_executor_crash,
mode="all",
output_format="text",
report_root_dir="",
tlog=self.tlog
)
# 初始化种子仓库
self.case_base = CaseBase(
analyst=self.analyst,
cve_str=self.cve_str,
fuzz_task=self,
tlog=self.tlog,
target_file_path=self.target_file_path
)
self.tlog.debug(f"self.fuzzer_cache_output_crash_path = {self.fuzzer_cache_output_crash_path}")
self.tlog.debug(f"self.fuzzer_cache_output_queue_path = {self.fuzzer_cache_output_queue_path}")
# 初始化observer
self.crash_observer = Observer()
crash_eventHandler = CCDLEventHandler(self.case_base, True, self.tlog)
self.crash_observer.schedule(
crash_eventHandler, self.fuzzer_cache_output_crash_path, recursive=False
)
self.queue_observer = Observer()
queue_eventHandler = CCDLEventHandler(self.case_base, False, self.tlog)
self.queue_observer.schedule(
queue_eventHandler, self.fuzzer_cache_output_queue_path, recursive=False
)
def check(self) -> None:
try:
if self.binary_path is None:
raise Exception(f"binary_path 为 None")
if not os.path.isfile(self.binary_path):
raise Exception(f"binary_path : {self.binary_path} 文件不存在")
if self.asan_instr_binary_path is None:
raise Exception(f"asan_instr_binary_path 为 None")
if not os.path.isfile(self.asan_instr_binary_path):
raise Exception(f"asan_instr_binary_path : {self.asan_instr_binary_path} 文件不存在")
if self.cov_instr_binary_path is None:
raise Exception(f"cov_instr_binary_path 为 None")
if not os.path.isfile(self.cov_instr_binary_path):
raise Exception(f"cov_instr_binary_path : {self.cov_instr_binary_path} 文件不存在")
if self.cve_str is None:
raise Exception(f"cve_str 为 None")
if self.root_dir_path is None:
raise Exception(f"root_dir_path 为 None")
if self.target_file_path is None:
raise Exception(f"target_file_path 为 None")
if not os.path.isfile(self.target_file_path):
raise Exception(f"target_file_path : {self.target_file_path} 文件不存在")
if self.fuzzer_name is None:
raise Exception(f"fuzzer_name 为 None")
if self.seed_path is None:
raise Exception(f"seed_path 为 None")
if not os.path.isdir(self.seed_path):
raise Exception(f"seed_path : {self.seed_path} 目录不存在")
if self.task_name is None:
raise Exception(f"task_name 为 None")
if self.args is None:
raise Exception(f"args 为 None")
except Exception as e:
log.error(e)
sys.exit(1)
def run(self):
self.tlog.info("开始执行任务")
# 更新任务状态
self.tlog.debug("修改任务状态为 -> 运行态")
self.status = FUZZ_RUNNING
# 开启分析线程
self.tlog.info("正在开启分析线程...")
# 开启case_base的种子解析服务
self.case_base.start_process_service()
# 开启fuzz线程
self.startup_time = time.time()
self.executor.start_fuzz()
# time.sleep(5)
while True:
if os.path.exists(self.fuzzer_cache_output_crash_path) and os.path.exists(self.fuzzer_cache_output_queue_path):
break
time.sleep(0.1)
# 先将目录中已存在的case加入到种子库中
for queue_case_path in os.listdir(self.fuzzer_cache_output_queue_path):
case_file_path = os.path.join(self.fuzzer_cache_output_queue_path, queue_case_path)
if os.path.isfile(case_file_path):
new_case = Case(self.tlog)
new_case.case_base_init_time = self.case_base.init_time
new_case.init_from_origin_path(
case_file_path,
self.analyst.case_html_reports_dir,
self.case_base.init_time,
False,
False
)
self.case_base.add_case(new_case)
for queue_case_path in os.listdir(self.fuzzer_cache_output_crash_path):
case_file_path = os.path.join(self.fuzzer_cache_output_crash_path, queue_case_path)
if os.path.isfile(case_file_path):
new_case = Case(self.tlog)
new_case.case_base_init_time = self.case_base.init_time
new_case.init_from_origin_path(
case_file_path,
self.analyst.case_html_reports_dir,
self.case_base.init_time,
True,
False
)
self.case_base.add_case(new_case)
# 开启目录监听线程
self.crash_observer.start()
self.queue_observer.start()
self.tlog.info("分析线程启动完毕...")
while True:
running_sec = Util.cal_timestamp_diff_to_seconds_from_now(self.startup_time)
self.rw_lock.acquire()
self.publish()
self.rw_lock.release()
time.sleep(1)
if (self.mode == 'arrive'
and self.first_fit_stack_arrive_time is None
and self.first_not_fit_stack_arrive_time is None
and running_sec <= self.time_limit
):
continue
if (self.mode == 'crash'
and self.true_crash_num <= 0
and running_sec <= self.time_limit):
continue
# 已经到达时间上限了或者已经找到了真crash,则停止fuzz
self.rw_lock.acquire()
# 更新fuzz任务状态并发送一个消息
self.status = FUZZ_EXITTING
self.publish()
# 将结果输出到文件
self.dumps()
# 停止目录监听器
self.queue_observer.stop()
self.crash_observer.stop()
# 停止fuzz进程
self.executor.stop_fuzz()
# 更新fuzz任务状态并发送一个消息
self.status = FUZZ_FINISH
self.publish()
self.rw_lock.release()
break
"""
任务调度器
"""
class Scheduler:
def __init__(self, max_process_thread_size=8):
# 允许同时运行的最大任务数
self.max_process_thread_size = int(max_process_thread_size)
# 线程池
self.thread_pool = ThreadPoolExecutor(self.max_process_thread_size)
# 任务状态
self.task_status = []
# 每一个任务对应的消息队列
self.task_mq: List[MsgQueue] = []
# 每一个任务的读写锁
self.task_status_lock: List[threading.Lock] = []
def get_attribute(self, task_id:int, key:str) -> str:
res = "None"
if task_id < len(self.task_status_lock):
self.task_status_lock[task_id].acquire()
if key in self.task_status[task_id]:
res = self.task_status[task_id][key]
self.task_status_lock[task_id].release()
return res
def get_total_job_num(self) -> int:
return len(self.task_status)
def get_job_num(self, condition: int) -> int:
cnt = 0
for status in self.task_status:
if status['status'] == condition:
cnt += 1
return cnt
def on_receive_msg(self, ch, method, properties, body):
msg = json.loads(body)
task_id = int(msg['task_id'])
# 获取该任务状态的锁
self.task_status_lock[task_id].acquire()
# 修改任务状态
self.task_status[task_id] = msg
# 释放该任务状态的锁
self.task_status_lock[task_id].release()
# 添加一个任务
def commit_task(self, fuzz_task: FuzzTask):
# 先检查一下任务是否配置正确,如果不正确就会退出
fuzz_task.check()
# 获取到当前任务的id(也就是列表的下标)
task_id = len(self.task_status)
self.task_status.append({"task_id": task_id, "status": FUZZ_WAITTING, "status_name": "Waitting", "visit_cnt": 0})
self.task_status_lock.append(threading.Lock())
mq = MsgQueue(task_id, self.on_receive_msg)
self.task_mq.append(mq)
self.start_mq_process(mq)
# 初始化任务及其消息队列
fuzz_task.init_task(task_id)
# 在面板中添加
# self.fuzz_panel.action_add_fuzz_task_display(self, task_id)
self.thread_pool.submit(fuzz_task.run)
def mq_process(self, mq: MsgQueue):
mq.start_consuming()
def start_mq_process(self, mq: MsgQueue):
threading.Thread(target=self.mq_process, args=(mq, ), daemon=True).start()
class AttributeDisplay(Static):
"""展示属性"""
val = reactive("None")
def __init__(self, sch: Scheduler, task_id: int, key: str, label=None):
super().__init__()
self.key = key
self.sch = sch
self.task_id = task_id
if label is None:
label = key
self.label = label
def on_mount(self) -> None:
self.update_timer = self.set_interval(1, self.update_val)
self.update_timer.resume()
def watch_val(self, val: str) -> None:
self.update(f"{self.label:35} : {val}")
def update_val(self):
self.val = self.sch.get_attribute(self.task_id, self.key)
class FuzzTaskDisplay(Static):
"""A stopwatch widget."""
def __init__(self, sch: Scheduler, task_id: int):
super().__init__(classes="fuzz_task")
self.task_id = task_id
self.sch = sch
def compose(self) -> ComposeResult:
"""Create child widgets of a stopwatch."""
# yield Button("Start", id="start", variant="success")
# yield Button("Stop", id="stop", variant="error")
# yield Button("Reset", id="reset")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="task_id", label="Task ID")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="fuzzer_name", label="Fuzzer")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="task_name", label="Task Name")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="status_name", label="Status")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="startup_time_duration", label="Running")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="last_crash_time_duration", label="Last Crash")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="last_true_crash_time_duration", label="Last True Crash")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="path_num", label="Path Num")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="crash_num", label="Crash Num")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="true_crash_num", label="True Crash Num")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="max_target_depth", label="Max Target Depth")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="time_limit", label="Time Limit(s)")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_arrive_time_duration", label="First Arrive Time")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_arrive_file_name", label="First Arrive Seed")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_crash_time_duration", label="First Crash Time")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_crash_file_name", label="First Crash Seed")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_true_crash_time_duration", label="First True Crash Time")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_true_crash_file_name", label="First True Crash Seed")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_fit_stack_arrive_time_duration", label="First Fit Stack Arrive Time")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_fit_stack_arrive_file_name", label="First Fit Stack Arrive Seed")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_not_fit_stack_arrive_time_duration", label="First Not Fit Stack Arrive Time")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="first_not_fit_stack_arrive_file_name", label="First Not Fit Stack Arrive Seed")
yield AttributeDisplay(sch=self.sch, task_id=self.task_id, key="fuzzer_cache_output_path", label="Output Path")
class FuzzTaskPanel(App):
# CSS_PATH = "fuzzpanel.tcss"
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
]
def __init__(self, sch: Scheduler, task_list: List[FuzzTask]):
super().__init__()
self.sch = sch
self.task_display_list = list(map(lambda x:FuzzTaskDisplay(self.sch, x.task_id), task_list))
def compose(self) -> ComposeResult:
"""Called to add widgets to the app."""
yield Header()
yield Footer()
yield ScrollableContainer(*self.task_display_list, id="timers")
def action_toggle_dark(self) -> None:
"""An action to toggle dark mode."""
self.dark = not self.dark
def version() -> str:
return f"""
CaseRunner: 跑Fuzz神器
版本: {VERSION}
"""
def usage() -> str:
header_info = version()
return f"""{Printer().to_yellow(header_info)}
用法:
--help 展示帮助信息
--version -v 展示版本
--config-file -f [可选] 该参数指定了CaseRunner的任务配置文件路径。
参数默认为task_config.yaml
--list-supported-cve 打印CaseRunner支持分析的所有CVE
--debug -d [可选] 开启调试模式。若指定该参数,CaseRunner会输出调试信息。
"""
if __name__ == "__main__":
try:
opts, args = getopt.getopt(
sys.argv[1:],
"f:hdv",
[
"config-file=",
"help",
"list-supported-cve",
"debug",
"version",
]
)
config_file_path = "task_config.yaml"
max_job_size = 4
for opt, arg in opts:
if opt in ['-h', '--help']:
print(usage())
sys.exit()
elif opt in ['-f', '--config-file']:
config_file_path = arg
elif opt in ['--list-supported-cve']:
supported_cve_list = CVEChecker().get_supported_cve_list()
Printer().print_green("CaseRunner支持的CVE:")
print_str = "["
for supported_cve in supported_cve_list:
print_str += f" {supported_cve},"
print_str += "]"
Printer().print_blue(print_str)
sys.exit()
elif opt in ["-v", "--version"]:
print(version())
sys.exit()
elif opt in ["-d", "--debug"]:
DEBUG = True
if config_file_path is None or not os.path.isfile(config_file_path):
log.error("请指定任务配置文件")
sys.exit()
task_config = {}
with open(config_file_path, "r") as config_f:
task_config = yaml.load(config_f.read(), Loader=yaml.FullLoader)
task_mode = task_config["mode"]
output_path = task_config["output_path"]
task_config_list = task_config["tasks"]
if "job" in task_config:
max_job_size = task_config["job"]
fuzz_task_obj_list = []
sch = Scheduler(max_job_size)
for task in task_config_list:
stdin = 'file'
if 'stdin' in task:
stdin = task['stdin']
fuzz_task = FuzzTask(
fuzzer_name=task["fuzzer_name"],
task_name=task["task_name"],
root_dir_path=output_path,
seed_path=task["seed_path"],
binary_path=task["binary_path"],
cov_instr_binary_path=task["cov_instr_binary_path"],
asan_instr_binary_path=task["asan_instr_binary_path"],
args=task["args"],
target_file_path=task["target_file_path"],
cve_str=task["cve_str"],
time_limit=task["time_limit"],
mode=task_mode,
stdin=stdin
)
fuzz_task_obj_list.append(fuzz_task)
sch.commit_task(fuzz_task)
app = FuzzTaskPanel(sch, fuzz_task_obj_list)
app.run()
except getopt.GetoptError as e:
Printer().print_red(f"[ERROR] invalid args {e}")
print(usage())
except Exception as e:
log.error(e, "all")
# while True:
# if(sch.get_total_job_num() == sch.get_job_num(FUZZ_FINISH)):
# # 如果队列里所有的任务都完成了,则将输出一份综合的报告
# sch.gen_report()
# break
# time.sleep(1)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。