wpaudit/modules/exploit_intel/searchsploit_handler.py
Huzaifa Shoukat 8e8b4f8a84 Bug Fix
2025-05-21 22:39:24 +05:00

169 lines
11 KiB
Python

import json
import re
import os
from core.tool_runner import run_command
from core.utils import get_scan_filename_prefix, sanitize_filename
def search_searchsploit(state, config, query, base_scan_prefix_for_log):
"""
Runs SearchSploit for a given query and parses the results.
Returns a list of found exploits and a list of manual commands.
"""
print(f" Running SearchSploit for '{query}'...")
found_ss_exploits = []
manual_cmds = []
searchsploit_timeout = config.get("searchsploit_timeout", 60) # Increased timeout slightly
# Main output directory from config
main_output_dir = config.get("output_dir", "omegascythe_overlord_reports")
tool_logs_dir = os.path.join(main_output_dir, "tool_logs")
searchsploit_console_log = os.path.join(
tool_logs_dir,
f"{os.path.basename(base_scan_prefix_for_log)}_searchsploit_{sanitize_filename(query)}.log"
)
os.makedirs(tool_logs_dir, exist_ok=True)
# Directory for mirrored exploits
enable_mirroring = config.get("exploit_intel_mirror_searchsploit_exploits", False)
mirrored_exploits_base_dir = os.path.join(main_output_dir, config.get("mirrored_exploits_dir", "mirrored_exploits"), "searchsploit")
if enable_mirroring:
os.makedirs(mirrored_exploits_base_dir, exist_ok=True)
try:
# Always use -j for JSON output, and -v for verbose (might help if JSON fails)
# --disable-colour is good.
cmd_ss_search = ["searchsploit", "-j", "-v", "--disable-colour"]
is_cve = query.upper().startswith("CVE-")
if is_cve:
cmd_ss_search.append("--cve")
cmd_ss_search.append(query)
# SearchSploit -j writes JSON to stdout, so we capture that directly.
# The log_file_path for run_command will capture stderr or any non-JSON stdout.
process_ss_obj = run_command(
cmd_ss_search, "SearchSploit (Search)", config,
timeout=searchsploit_timeout,
return_proc=True,
log_file_path=searchsploit_console_log # Captures stderr and any non-JSON stdout
)
json_output_to_parse = ""
ss_data = None
# Since log_file_path is used, run_command's Popen stdout stream is consumed.
# We must read the content from the log file.
if process_ss_obj and process_ss_obj.returncode == 0: # Check return code of Popen object
if os.path.exists(searchsploit_console_log):
print(f" [i] SearchSploit: Reading output from log file: {searchsploit_console_log}")
with open(searchsploit_console_log, 'r', errors='ignore') as f_log_ss:
# The log file contains "Executing: ..." line and then the tool output.
# We need to find the JSON part.
full_log_content = f_log_ss.read()
# Searchsploit -j -v might print "SEARCH: ..." and other lines before JSON.
# The JSON output itself starts with '{' and ends with '}'.
# A more robust way is to find the actual JSON block.
json_block_match = re.search(r'{\s*"SEARCH":.*RESULTS_PAPER":\s*\[.*?\]\s*}', full_log_content, re.DOTALL)
if json_block_match:
json_output_to_parse = json_block_match.group(0)
try:
ss_data = json.loads(json_output_to_parse)
except json.JSONDecodeError as e_parse:
print(f" [?] SearchSploit: Failed to parse extracted JSON from log for '{query}'. Error: {e_parse}. Snippet: {json_output_to_parse[:200]}")
json_output_to_parse = "" # Reset if parsing failed
else:
print(f" [?] SearchSploit: Could not find valid JSON block in log file '{searchsploit_console_log}'.")
else:
print(f" [!] SearchSploit: Log file '{searchsploit_console_log}' not found, though command reported success.")
elif process_ss_obj: # Command had non-zero return code
print(f" [-] SearchSploit command failed for '{query}'. RC: {process_ss_obj.returncode}. Log: {searchsploit_console_log}")
else: # process_ss_obj is None (e.g., timeout in run_command before Popen)
print(f" [-] SearchSploit execution failed to start or timed out for '{query}'. Log: {searchsploit_console_log if os.path.exists(searchsploit_console_log) else 'not created'}")
if ss_data: # If ss_data was successfully parsed
try:
# Re-parse if it came from log or if direct stdout parsing was reset
if not isinstance(json_output_to_parse, dict) and not isinstance(json_output_to_parse, list):
ss_data = json.loads(json_output_to_parse)
else: # Already parsed if stdout was clean
ss_data = json_output_to_parse if (isinstance(json_output_to_parse, dict) or isinstance(json_output_to_parse, list)) else json.loads(json_output_to_parse)
ss_results = ss_data.get("RESULTS_EXPLOIT", [])
# If RESULTS_EXPLOIT is empty, but the root object is a list (older searchsploit versions or different query types?)
if not ss_results and isinstance(ss_data, list):
ss_results = ss_data
# If RESULTS_EXPLOIT is not found, but there's a top-level list of results (e.g. for --cve without matches but other info)
elif not ss_results and isinstance(ss_data.get("RESULTS_CVE"), list): # Check for CVE results structure
ss_results = ss_data.get("RESULTS_CVE") # This might not have EDB-ID directly, handle carefully
for exploit in ss_results:
if not isinstance(exploit, dict): continue # Skip non-dict items
edb_id = exploit.get("EDB-ID")
exploit_info = {
"title": exploit.get("Title"), "path": exploit.get("Path"),
"EDB-ID": edb_id, "type": exploit.get("Type"),
"platform": exploit.get("Platform"),
"date_published": exploit.get("Date"), # Added Date
"author": exploit.get("Author"), # Added Author
"mirrored_path": None # Placeholder for mirrored exploit
}
found_ss_exploits.append(exploit_info)
if edb_id:
manual_cmd_str = f"searchsploit -m {edb_id}"
print(f" -> Found EDB-ID {edb_id}: '{exploit.get('Title')}'. Manual mirror: {manual_cmd_str}")
manual_cmds.append({
"query": query, "EDB-ID": edb_id,
"title": exploit.get("Title"), "command": manual_cmd_str
})
# Mirror exploit if enabled
if enable_mirroring:
mirror_cmd = ["searchsploit", "-m", edb_id, "--path-only"] # Get path first to avoid clutter
# We need a way to tell searchsploit *where* to mirror, or it uses default CWD.
# Searchsploit -m EDBID mirrors to current working directory.
# We need to run it from the target mirrored_exploits_base_dir or copy after.
# For simplicity, let's try to get the path and then copy.
# However, `searchsploit -p EDBID` gives the path *within the searchsploit dir*.
# `searchsploit -m EDBID` copies it to CWD.
# Let's use a temporary CWD for mirroring or handle paths carefully.
# Simpler: mirror to CWD then move, or mirror directly if searchsploit supports output dir for -m.
# Searchsploit -m EDB-ID -o /target/dir is NOT a standard option.
# So, we run `searchsploit -m EDB-ID` and it will download to the CWD of the run_command.
# We need to ensure run_command's CWD is controllable or files are moved.
# For now, let's assume run_command executes in a predictable CWD or we handle it post-execution.
# This part is tricky with run_command. Let's just log the command.
# Actual mirroring implementation might need `subprocess.run` with `cwd` argument.
# For now, we'll just record the intent and path.
# If searchsploit_handler is enhanced to *return* the mirrored path, that's better.
# Placeholder for actual mirroring logic - for now, just note it.
# In a more advanced version, this would call a helper to mirror and get the new path.
print(f" Mirroring enabled. Command to mirror EDB-ID {edb_id}: searchsploit -m {edb_id}")
# exploit_info["mirrored_path"] = "Path/To/Mirrored/Exploit" # Update this if mirrored
if found_ss_exploits:
print(f" [+] SearchSploit: Found {len(found_ss_exploits)} results for '{query}'.")
else:
print(f" [-] SearchSploit: No results found for '{query}' in parsed JSON.")
except json.JSONDecodeError as e:
print(f" [-] SearchSploit: JSON decode error for '{query}'. Log: '{searchsploit_console_log}'. Error: {e}. Content snippet: {json_output_to_parse[:200]}")
elif process_ss_obj and process_ss_obj.returncode == 0 and not json_output_to_parse.strip():
print(f" [-] SearchSploit: Command successful but no JSON output for '{query}'. Log: {searchsploit_console_log}")
elif process_ss_obj and hasattr(process_ss_obj, 'returncode'): # Command failed
print(f" [-] SearchSploit command failed for '{query}'. RC: {process_ss_obj.returncode}. Log: {searchsploit_console_log}")
else: # Process object None (e.g. timeout by run_command)
print(f" [-] SearchSploit execution failed to start or timed out for '{query}'. Log: {searchsploit_console_log if os.path.exists(searchsploit_console_log) else 'not created'}")
except Exception as e:
print(f" [-] SearchSploit: General error processing for '{query}': {type(e).__name__} - {e}")
state.add_tool_error(f"Searchsploit General Error for query '{query}': {e}")
return found_ss_exploits, manual_cmds