mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
231 lines
13 KiB
Python
231 lines
13 KiB
Python
import os
|
|
from core.utils import get_scan_filename_prefix # For log naming consistency
|
|
from .query_builder import build_search_queries
|
|
from .searchsploit_handler import search_searchsploit
|
|
from .metasploit_handler import search_metasploit
|
|
|
|
def run_scan(state, config):
|
|
"""
|
|
Orchestrates the gathering of exploit intelligence.
|
|
Builds queries, searches SearchSploit, searches Metasploit,
|
|
generates guidance, and handles optional autorun.
|
|
"""
|
|
print("\n[*] Phase Exploit Intel: Gathering Exploit Intelligence")
|
|
module_key = "exploit_intelligence"
|
|
|
|
# Initialize findings structure (ensure all keys are present)
|
|
initial_findings = {
|
|
"search_queries_tried": [],
|
|
"found_exploits": {}, # Keyed by query: {"searchsploit": [], "metasploit": []}
|
|
"exploit_guidance": {
|
|
"manual_searchsploit_cmds": [],
|
|
"generated_msf_rc_files": []
|
|
},
|
|
"autorun_attempts": [], # List of dicts from Metasploit autorun
|
|
"status": "Running"
|
|
}
|
|
state.update_module_findings(module_key, initial_findings)
|
|
|
|
# 1. Build Search Queries (now returns a list of structured query dicts)
|
|
# Example query_obj: {"query_string": "WordPress Core 6.2", "type": "WordPress Core",
|
|
# "cves": ["CVE-2023-1234"], "component_name": "Core", "component_version": "6.2"}
|
|
# Example query_obj for CVE: {"query_string": "CVE-2023-1234", "type": "CVE", "cves": ["CVE-2023-1234"]}
|
|
structured_queries_list = build_search_queries(state)
|
|
|
|
current_findings = state.get_module_findings(module_key)
|
|
# Store the structured queries for transparency
|
|
current_findings["search_queries_tried"] = structured_queries_list
|
|
state.update_module_findings(module_key, current_findings)
|
|
|
|
if not structured_queries_list:
|
|
print(" [i] No specific items identified for exploit search. Skipping Exploit Intel phase.")
|
|
current_findings["status"] = "Skipped (No Queries)"
|
|
state.update_module_findings(module_key, current_findings)
|
|
return
|
|
|
|
# Prepare local accumulators
|
|
# all_found_exploits_dict will be keyed by a unique identifier for the vulnerability/component,
|
|
# e.g., CVE ID, or "plugin_name_version".
|
|
# Value will be {"searchsploit": [], "metasploit": [], "queries_related": []}
|
|
correlated_exploits = {}
|
|
all_manual_searchsploit_cmds = []
|
|
all_generated_msf_rc_files = []
|
|
all_autorun_attempts = []
|
|
|
|
tool_checks = state.get_full_state().get("tool_checks", {})
|
|
searchsploit_available = tool_checks.get("searchsploit", {}).get("status") == "Found"
|
|
msfconsole_available = tool_checks.get("msfconsole", {}).get("status") == "Found"
|
|
searchsploit_install_guidance_printed = False # Flag to ensure guidance is printed only once
|
|
|
|
lhost_for_rc = config.get("exploit_intel_autorun_lhost", "127.0.0.1")
|
|
current_lport_ref = [config.get("exploit_intel_autorun_lport_start", 4444)]
|
|
base_scan_prefix_for_log = get_scan_filename_prefix(state, config)
|
|
|
|
# Prioritize CVE-based queries first (simple prioritization)
|
|
# This could be made more sophisticated later.
|
|
priority_queries = [q for q in structured_queries_list if q["type"] == "CVE"]
|
|
other_queries = [q for q in structured_queries_list if q["type"] != "CVE"]
|
|
# Simple reordering: CVEs first, then others. Could add more logic here.
|
|
ordered_queries_to_run = priority_queries + other_queries
|
|
|
|
|
|
for query_obj in ordered_queries_to_run:
|
|
query_string = query_obj["query_string"]
|
|
query_type = query_obj["type"]
|
|
related_cves = query_obj.get("cves", [])
|
|
component_key = query_string # Default key for non-CVE specific queries
|
|
|
|
# Use CVE as primary key if available for better correlation
|
|
if related_cves:
|
|
component_key = related_cves[0] # Use the first CVE as the primary correlation key
|
|
elif query_type not in ["CVE", "Vulnerability Pattern"]: # For software components
|
|
component_key = f"{query_obj.get('component_name', 'UnknownComponent')}_{query_obj.get('component_version', 'any')}"
|
|
component_key = component_key.replace(" ", "_").lower()
|
|
|
|
|
|
print(f"\n Processing query: '{query_string}' (Type: {query_type}, Key: {component_key})")
|
|
|
|
# Initialize entry in correlated_exploits if not present
|
|
if component_key not in correlated_exploits:
|
|
correlated_exploits[component_key] = {"searchsploit": [], "metasploit": [], "related_queries": [], "associated_cves": set()}
|
|
|
|
if query_string not in correlated_exploits[component_key]["related_queries"]:
|
|
correlated_exploits[component_key]["related_queries"].append(query_string)
|
|
if related_cves:
|
|
correlated_exploits[component_key]["associated_cves"].update(related_cves)
|
|
|
|
|
|
# 2. SearchSploit
|
|
if searchsploit_available:
|
|
# SearchSploit often works best with the raw query_string
|
|
ss_raw_exploits, ss_manual_cmds = search_searchsploit(state, config, query_string, base_scan_prefix_for_log)
|
|
if ss_raw_exploits:
|
|
processed_ss_exploits = []
|
|
for exploit_detail in ss_raw_exploits:
|
|
confidence = _calculate_searchsploit_confidence(exploit_detail, query_obj)
|
|
exploit_detail_with_confidence = {**exploit_detail, "confidence": confidence}
|
|
processed_ss_exploits.append(exploit_detail_with_confidence)
|
|
# Optionally print confidence here
|
|
# print(f" SearchSploit Find: {exploit_detail.get('title')} (Confidence: {confidence})")
|
|
correlated_exploits[component_key]["searchsploit"].extend(processed_ss_exploits)
|
|
if ss_manual_cmds: # Manual commands are usually generic per query
|
|
all_manual_searchsploit_cmds.extend(cmd for cmd in ss_manual_cmds if cmd not in all_manual_searchsploit_cmds)
|
|
else:
|
|
if not searchsploit_install_guidance_printed:
|
|
print(" --------------------------------------------------------------------------------")
|
|
print(" [!] SearchSploit Tool Advisory:")
|
|
print(" SearchSploit was not found or is not correctly configured in your PATH.")
|
|
print(" This tool is part of Exploit-DB and is highly recommended for exploit intelligence.")
|
|
print(" To install/update SearchSploit:")
|
|
print(" 1. Ensure Git is installed on your system.")
|
|
print(" 2. Clone the Exploit-DB repository (if you haven't already):")
|
|
print(" `git clone https://github.com/offensive-security/exploitdb.git /opt/exploitdb`")
|
|
print(" (Replace `/opt/exploitdb` with your preferred installation directory).")
|
|
print(" 3. Ensure the `searchsploit` script from the cloned repository is executable and in your system's PATH.")
|
|
print(" For Linux/macOS, you might symlink it: ")
|
|
print(" `sudo ln -sf /opt/exploitdb/searchsploit /usr/local/bin/searchsploit`")
|
|
print(" 4. Update its database regularly: `searchsploit -u`")
|
|
print(" For Windows users: It is strongly recommended to use SearchSploit within WSL (Windows Subsystem for Linux)")
|
|
print(" for best compatibility and ease of use. Install Git within WSL and follow the Linux steps.")
|
|
print(" --------------------------------------------------------------------------------")
|
|
searchsploit_install_guidance_printed = True
|
|
|
|
print(f" [i] Skipping SearchSploit for '{query_string}' (Tool not available as per initial check).")
|
|
|
|
# 3. Metasploit Search (primarily for CVEs, but can try for other query types too if logic allows)
|
|
# Metasploit handler might internally decide if a query is suitable (e.g. only CVEs)
|
|
if msfconsole_available:
|
|
# Pass the primary CVE or the full query string to metasploit_handler
|
|
msf_search_term = related_cves[0] if related_cves else query_string
|
|
|
|
msf_exploits, msf_rc_files, msf_autoruns = search_metasploit(
|
|
state, config, msf_search_term, base_scan_prefix_for_log, lhost_for_rc, current_lport_ref
|
|
)
|
|
if msf_exploits:
|
|
correlated_exploits[component_key]["metasploit"].extend(msf_exploits)
|
|
if msf_rc_files: # RC files are usually specific to an exploit/module
|
|
all_generated_msf_rc_files.extend(rc for rc in msf_rc_files if rc not in all_generated_msf_rc_files)
|
|
if msf_autoruns:
|
|
all_autorun_attempts.extend(ar for ar in msf_autoruns if ar not in all_autorun_attempts)
|
|
elif related_cves: # If it was a CVE query but MSF not available
|
|
print(f" [i] Skipping Metasploit search for '{query_string}' (Tool not available).")
|
|
|
|
|
|
# --- Final State Update ---
|
|
final_findings = state.get_module_findings(module_key)
|
|
# Convert sets of CVEs in correlated_exploits to lists for JSON serialization
|
|
for key in correlated_exploits:
|
|
correlated_exploits[key]["associated_cves"] = sorted(list(correlated_exploits[key]["associated_cves"]))
|
|
# Deduplicate exploits within searchsploit and metasploit lists if necessary (e.g. by EDB-ID or MSF path)
|
|
# For now, simple extend is used. Deduplication can be added in handlers or here.
|
|
|
|
final_findings["found_exploits_correlated"] = correlated_exploits # New key for correlated results
|
|
final_findings["exploit_guidance"]["manual_searchsploit_cmds"] = sorted(list(set(all_manual_searchsploit_cmds)))
|
|
final_findings["exploit_guidance"]["generated_msf_rc_files"] = sorted(list(set(all_generated_msf_rc_files)))
|
|
final_findings["autorun_attempts"] = all_autorun_attempts # Already a list of dicts, set not needed if unique by content
|
|
final_findings["status"] = "Completed"
|
|
|
|
# Remove old "found_exploits" if it exists from previous schema
|
|
if "found_exploits" in final_findings:
|
|
del final_findings["found_exploits"]
|
|
|
|
state.update_module_findings(module_key, final_findings)
|
|
|
|
print("\n[*] Exploit Intel phase finished.")
|
|
|
|
|
|
def _calculate_searchsploit_confidence(exploit_detail, query_obj):
|
|
"""
|
|
Calculates a basic confidence score for a SearchSploit result based on the query.
|
|
exploit_detail: dict from searchsploit_handler (e.g., {"title": "...", "path": "...", "id": "..."})
|
|
query_obj: structured query dict from query_builder
|
|
(e.g., {"query_string": "WordPress Core 6.2", "type": "WordPress Core",
|
|
"cves": ["CVE-2023-1234"], "component_name": "Core", "component_version": "6.2"})
|
|
Returns: string "High", "Medium", "Low", or "Minimal"
|
|
"""
|
|
title = exploit_detail.get("title", "").lower()
|
|
# Path might also contain version/CVE info; SearchSploit output often includes it in the path description
|
|
path_description = exploit_detail.get("path", "").lower()
|
|
exploit_text_content = title + " " + path_description
|
|
|
|
query_cves = query_obj.get("cves", [])
|
|
# Ensure component_name and component_version are strings before lowercasing
|
|
query_comp_name_raw = query_obj.get("component_name")
|
|
query_comp_name = str(query_comp_name_raw).lower() if query_comp_name_raw else ""
|
|
|
|
query_comp_version_raw = query_obj.get("component_version")
|
|
query_comp_version = str(query_comp_version_raw).lower() if query_comp_version_raw else ""
|
|
|
|
query_string_lower = query_obj.get("query_string", "").lower()
|
|
|
|
# Highest: Explicit CVE match in title or path description
|
|
if query_cves:
|
|
for cve in query_cves:
|
|
if str(cve).lower() in exploit_text_content:
|
|
return "High (CVE Match)"
|
|
|
|
# High: Exact name and version match
|
|
if query_comp_name and query_comp_version:
|
|
# SearchSploit titles often have version ranges or just major.minor.
|
|
# A simple "in" check is a good start.
|
|
if query_comp_name in exploit_text_content and query_comp_version in exploit_text_content:
|
|
return "High (Name & Version Match)"
|
|
# Check if major.minor matches if full version doesn't
|
|
if '.' in query_comp_version:
|
|
major_minor_version = '.'.join(query_comp_version.split('.')[:2])
|
|
if query_comp_name in exploit_text_content and major_minor_version in exploit_text_content:
|
|
return "High (Name & Major.Minor Version Match)"
|
|
|
|
# Medium: Name match only (version might be missing or broader in exploit title)
|
|
if query_comp_name and query_comp_name in exploit_text_content:
|
|
return "Medium (Name Match)"
|
|
|
|
# Low: Original query string (which might be a pattern or less specific) matches title/path
|
|
# This is for queries like "Vulnerability Pattern SomePattern"
|
|
# Ensure query_string_lower is not too generic by itself.
|
|
if query_string_lower and query_string_lower in exploit_text_content:
|
|
# Avoid overly generic matches if query_string was very broad (e.g., just "wordpress")
|
|
if len(query_string_lower.split()) > 1 or len(query_string_lower) > 10: # Heuristic for specificity
|
|
return "Low (Query String Match)"
|
|
|
|
return "Minimal (Keyword Hint)"
|