wpaudit/modules/exploit_intel/gatherer.py
Huzaifa Shoukat 01712e5871 Bug Fix
2025-05-22 01:26:12 +05:00

231 lines
13 KiB
Python

import os
from core.utils import get_scan_filename_prefix # For log naming consistency
from .query_builder import build_search_queries
from .searchsploit_handler import search_searchsploit
from .metasploit_handler import search_metasploit
def run_scan(state, config):
"""
Orchestrates the gathering of exploit intelligence.
Builds queries, searches SearchSploit, searches Metasploit,
generates guidance, and handles optional autorun.
"""
print("\n[*] Phase Exploit Intel: Gathering Exploit Intelligence")
module_key = "exploit_intelligence"
# Initialize findings structure (ensure all keys are present)
initial_findings = {
"search_queries_tried": [],
"found_exploits": {}, # Keyed by query: {"searchsploit": [], "metasploit": []}
"exploit_guidance": {
"manual_searchsploit_cmds": [],
"generated_msf_rc_files": []
},
"autorun_attempts": [], # List of dicts from Metasploit autorun
"status": "Running"
}
state.update_module_findings(module_key, initial_findings)
# 1. Build Search Queries (now returns a list of structured query dicts)
# Example query_obj: {"query_string": "WordPress Core 6.2", "type": "WordPress Core",
# "cves": ["CVE-2023-1234"], "component_name": "Core", "component_version": "6.2"}
# Example query_obj for CVE: {"query_string": "CVE-2023-1234", "type": "CVE", "cves": ["CVE-2023-1234"]}
structured_queries_list = build_search_queries(state)
current_findings = state.get_module_findings(module_key)
# Store the structured queries for transparency
current_findings["search_queries_tried"] = structured_queries_list
state.update_module_findings(module_key, current_findings)
if not structured_queries_list:
print(" [i] No specific items identified for exploit search. Skipping Exploit Intel phase.")
current_findings["status"] = "Skipped (No Queries)"
state.update_module_findings(module_key, current_findings)
return
# Prepare local accumulators
# all_found_exploits_dict will be keyed by a unique identifier for the vulnerability/component,
# e.g., CVE ID, or "plugin_name_version".
# Value will be {"searchsploit": [], "metasploit": [], "queries_related": []}
correlated_exploits = {}
all_manual_searchsploit_cmds = []
all_generated_msf_rc_files = []
all_autorun_attempts = []
tool_checks = state.get_full_state().get("tool_checks", {})
searchsploit_available = tool_checks.get("searchsploit", {}).get("status") == "Found"
msfconsole_available = tool_checks.get("msfconsole", {}).get("status") == "Found"
searchsploit_install_guidance_printed = False # Flag to ensure guidance is printed only once
lhost_for_rc = config.get("exploit_intel_autorun_lhost", "127.0.0.1")
current_lport_ref = [config.get("exploit_intel_autorun_lport_start", 4444)]
base_scan_prefix_for_log = get_scan_filename_prefix(state, config)
# Prioritize CVE-based queries first (simple prioritization)
# This could be made more sophisticated later.
priority_queries = [q for q in structured_queries_list if q["type"] == "CVE"]
other_queries = [q for q in structured_queries_list if q["type"] != "CVE"]
# Simple reordering: CVEs first, then others. Could add more logic here.
ordered_queries_to_run = priority_queries + other_queries
for query_obj in ordered_queries_to_run:
query_string = query_obj["query_string"]
query_type = query_obj["type"]
related_cves = query_obj.get("cves", [])
component_key = query_string # Default key for non-CVE specific queries
# Use CVE as primary key if available for better correlation
if related_cves:
component_key = related_cves[0] # Use the first CVE as the primary correlation key
elif query_type not in ["CVE", "Vulnerability Pattern"]: # For software components
component_key = f"{query_obj.get('component_name', 'UnknownComponent')}_{query_obj.get('component_version', 'any')}"
component_key = component_key.replace(" ", "_").lower()
print(f"\n Processing query: '{query_string}' (Type: {query_type}, Key: {component_key})")
# Initialize entry in correlated_exploits if not present
if component_key not in correlated_exploits:
correlated_exploits[component_key] = {"searchsploit": [], "metasploit": [], "related_queries": [], "associated_cves": set()}
if query_string not in correlated_exploits[component_key]["related_queries"]:
correlated_exploits[component_key]["related_queries"].append(query_string)
if related_cves:
correlated_exploits[component_key]["associated_cves"].update(related_cves)
# 2. SearchSploit
if searchsploit_available:
# SearchSploit often works best with the raw query_string
ss_raw_exploits, ss_manual_cmds = search_searchsploit(state, config, query_string, base_scan_prefix_for_log)
if ss_raw_exploits:
processed_ss_exploits = []
for exploit_detail in ss_raw_exploits:
confidence = _calculate_searchsploit_confidence(exploit_detail, query_obj)
exploit_detail_with_confidence = {**exploit_detail, "confidence": confidence}
processed_ss_exploits.append(exploit_detail_with_confidence)
# Optionally print confidence here
# print(f" SearchSploit Find: {exploit_detail.get('title')} (Confidence: {confidence})")
correlated_exploits[component_key]["searchsploit"].extend(processed_ss_exploits)
if ss_manual_cmds: # Manual commands are usually generic per query
all_manual_searchsploit_cmds.extend(cmd for cmd in ss_manual_cmds if cmd not in all_manual_searchsploit_cmds)
else:
if not searchsploit_install_guidance_printed:
print(" --------------------------------------------------------------------------------")
print(" [!] SearchSploit Tool Advisory:")
print(" SearchSploit was not found or is not correctly configured in your PATH.")
print(" This tool is part of Exploit-DB and is highly recommended for exploit intelligence.")
print(" To install/update SearchSploit:")
print(" 1. Ensure Git is installed on your system.")
print(" 2. Clone the Exploit-DB repository (if you haven't already):")
print(" `git clone https://github.com/offensive-security/exploitdb.git /opt/exploitdb`")
print(" (Replace `/opt/exploitdb` with your preferred installation directory).")
print(" 3. Ensure the `searchsploit` script from the cloned repository is executable and in your system's PATH.")
print(" For Linux/macOS, you might symlink it: ")
print(" `sudo ln -sf /opt/exploitdb/searchsploit /usr/local/bin/searchsploit`")
print(" 4. Update its database regularly: `searchsploit -u`")
print(" For Windows users: It is strongly recommended to use SearchSploit within WSL (Windows Subsystem for Linux)")
print(" for best compatibility and ease of use. Install Git within WSL and follow the Linux steps.")
print(" --------------------------------------------------------------------------------")
searchsploit_install_guidance_printed = True
print(f" [i] Skipping SearchSploit for '{query_string}' (Tool not available as per initial check).")
# 3. Metasploit Search (primarily for CVEs, but can try for other query types too if logic allows)
# Metasploit handler might internally decide if a query is suitable (e.g. only CVEs)
if msfconsole_available:
# Pass the primary CVE or the full query string to metasploit_handler
msf_search_term = related_cves[0] if related_cves else query_string
msf_exploits, msf_rc_files, msf_autoruns = search_metasploit(
state, config, msf_search_term, base_scan_prefix_for_log, lhost_for_rc, current_lport_ref
)
if msf_exploits:
correlated_exploits[component_key]["metasploit"].extend(msf_exploits)
if msf_rc_files: # RC files are usually specific to an exploit/module
all_generated_msf_rc_files.extend(rc for rc in msf_rc_files if rc not in all_generated_msf_rc_files)
if msf_autoruns:
all_autorun_attempts.extend(ar for ar in msf_autoruns if ar not in all_autorun_attempts)
elif related_cves: # If it was a CVE query but MSF not available
print(f" [i] Skipping Metasploit search for '{query_string}' (Tool not available).")
# --- Final State Update ---
final_findings = state.get_module_findings(module_key)
# Convert sets of CVEs in correlated_exploits to lists for JSON serialization
for key in correlated_exploits:
correlated_exploits[key]["associated_cves"] = sorted(list(correlated_exploits[key]["associated_cves"]))
# Deduplicate exploits within searchsploit and metasploit lists if necessary (e.g. by EDB-ID or MSF path)
# For now, simple extend is used. Deduplication can be added in handlers or here.
final_findings["found_exploits_correlated"] = correlated_exploits # New key for correlated results
final_findings["exploit_guidance"]["manual_searchsploit_cmds"] = sorted(list(set(all_manual_searchsploit_cmds)))
final_findings["exploit_guidance"]["generated_msf_rc_files"] = sorted(list(set(all_generated_msf_rc_files)))
final_findings["autorun_attempts"] = all_autorun_attempts # Already a list of dicts, set not needed if unique by content
final_findings["status"] = "Completed"
# Remove old "found_exploits" if it exists from previous schema
if "found_exploits" in final_findings:
del final_findings["found_exploits"]
state.update_module_findings(module_key, final_findings)
print("\n[*] Exploit Intel phase finished.")
def _calculate_searchsploit_confidence(exploit_detail, query_obj):
"""
Calculates a basic confidence score for a SearchSploit result based on the query.
exploit_detail: dict from searchsploit_handler (e.g., {"title": "...", "path": "...", "id": "..."})
query_obj: structured query dict from query_builder
(e.g., {"query_string": "WordPress Core 6.2", "type": "WordPress Core",
"cves": ["CVE-2023-1234"], "component_name": "Core", "component_version": "6.2"})
Returns: string "High", "Medium", "Low", or "Minimal"
"""
title = exploit_detail.get("title", "").lower()
# Path might also contain version/CVE info; SearchSploit output often includes it in the path description
path_description = exploit_detail.get("path", "").lower()
exploit_text_content = title + " " + path_description
query_cves = query_obj.get("cves", [])
# Ensure component_name and component_version are strings before lowercasing
query_comp_name_raw = query_obj.get("component_name")
query_comp_name = str(query_comp_name_raw).lower() if query_comp_name_raw else ""
query_comp_version_raw = query_obj.get("component_version")
query_comp_version = str(query_comp_version_raw).lower() if query_comp_version_raw else ""
query_string_lower = query_obj.get("query_string", "").lower()
# Highest: Explicit CVE match in title or path description
if query_cves:
for cve in query_cves:
if str(cve).lower() in exploit_text_content:
return "High (CVE Match)"
# High: Exact name and version match
if query_comp_name and query_comp_version:
# SearchSploit titles often have version ranges or just major.minor.
# A simple "in" check is a good start.
if query_comp_name in exploit_text_content and query_comp_version in exploit_text_content:
return "High (Name & Version Match)"
# Check if major.minor matches if full version doesn't
if '.' in query_comp_version:
major_minor_version = '.'.join(query_comp_version.split('.')[:2])
if query_comp_name in exploit_text_content and major_minor_version in exploit_text_content:
return "High (Name & Major.Minor Version Match)"
# Medium: Name match only (version might be missing or broader in exploit title)
if query_comp_name and query_comp_name in exploit_text_content:
return "Medium (Name Match)"
# Low: Original query string (which might be a pattern or less specific) matches title/path
# This is for queries like "Vulnerability Pattern SomePattern"
# Ensure query_string_lower is not too generic by itself.
if query_string_lower and query_string_lower in exploit_text_content:
# Avoid overly generic matches if query_string was very broad (e.g., just "wordpress")
if len(query_string_lower.split()) > 1 or len(query_string_lower) > 10: # Heuristic for specificity
return "Low (Query String Match)"
return "Minimal (Keyword Hint)"