mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
169 lines
11 KiB
Python
169 lines
11 KiB
Python
import json
|
|
import re
|
|
import os
|
|
from core.tool_runner import run_command
|
|
from core.utils import get_scan_filename_prefix, sanitize_filename
|
|
|
|
def search_searchsploit(state, config, query, base_scan_prefix_for_log):
|
|
"""
|
|
Runs SearchSploit for a given query and parses the results.
|
|
Returns a list of found exploits and a list of manual commands.
|
|
"""
|
|
print(f" Running SearchSploit for '{query}'...")
|
|
found_ss_exploits = []
|
|
manual_cmds = []
|
|
|
|
searchsploit_timeout = config.get("searchsploit_timeout", 60) # Increased timeout slightly
|
|
|
|
# Main output directory from config
|
|
main_output_dir = config.get("output_dir", "omegascythe_overlord_reports")
|
|
tool_logs_dir = os.path.join(main_output_dir, "tool_logs")
|
|
searchsploit_console_log = os.path.join(
|
|
tool_logs_dir,
|
|
f"{os.path.basename(base_scan_prefix_for_log)}_searchsploit_{sanitize_filename(query)}.log"
|
|
)
|
|
os.makedirs(tool_logs_dir, exist_ok=True)
|
|
|
|
# Directory for mirrored exploits
|
|
enable_mirroring = config.get("exploit_intel_mirror_searchsploit_exploits", False)
|
|
mirrored_exploits_base_dir = os.path.join(main_output_dir, config.get("mirrored_exploits_dir", "mirrored_exploits"), "searchsploit")
|
|
if enable_mirroring:
|
|
os.makedirs(mirrored_exploits_base_dir, exist_ok=True)
|
|
|
|
|
|
try:
|
|
# Always use -j for JSON output, and -v for verbose (might help if JSON fails)
|
|
# --disable-colour is good.
|
|
cmd_ss_search = ["searchsploit", "-j", "-v", "--disable-colour"]
|
|
is_cve = query.upper().startswith("CVE-")
|
|
if is_cve:
|
|
cmd_ss_search.append("--cve")
|
|
cmd_ss_search.append(query)
|
|
|
|
# SearchSploit -j writes JSON to stdout, so we capture that directly.
|
|
# The log_file_path for run_command will capture stderr or any non-JSON stdout.
|
|
process_ss_obj = run_command(
|
|
cmd_ss_search, "SearchSploit (Search)", config,
|
|
timeout=searchsploit_timeout,
|
|
return_proc=True,
|
|
log_file_path=searchsploit_console_log # Captures stderr and any non-JSON stdout
|
|
)
|
|
|
|
json_output_to_parse = ""
|
|
ss_data = None
|
|
|
|
# Since log_file_path is used, run_command's Popen stdout stream is consumed.
|
|
# We must read the content from the log file.
|
|
if process_ss_obj and process_ss_obj.returncode == 0: # Check return code of Popen object
|
|
if os.path.exists(searchsploit_console_log):
|
|
print(f" [i] SearchSploit: Reading output from log file: {searchsploit_console_log}")
|
|
with open(searchsploit_console_log, 'r', errors='ignore') as f_log_ss:
|
|
# The log file contains "Executing: ..." line and then the tool output.
|
|
# We need to find the JSON part.
|
|
full_log_content = f_log_ss.read()
|
|
# Searchsploit -j -v might print "SEARCH: ..." and other lines before JSON.
|
|
# The JSON output itself starts with '{' and ends with '}'.
|
|
# A more robust way is to find the actual JSON block.
|
|
json_block_match = re.search(r'{\s*"SEARCH":.*RESULTS_PAPER":\s*\[.*?\]\s*}', full_log_content, re.DOTALL)
|
|
if json_block_match:
|
|
json_output_to_parse = json_block_match.group(0)
|
|
try:
|
|
ss_data = json.loads(json_output_to_parse)
|
|
except json.JSONDecodeError as e_parse:
|
|
print(f" [?] SearchSploit: Failed to parse extracted JSON from log for '{query}'. Error: {e_parse}. Snippet: {json_output_to_parse[:200]}")
|
|
json_output_to_parse = "" # Reset if parsing failed
|
|
else:
|
|
print(f" [?] SearchSploit: Could not find valid JSON block in log file '{searchsploit_console_log}'.")
|
|
else:
|
|
print(f" [!] SearchSploit: Log file '{searchsploit_console_log}' not found, though command reported success.")
|
|
elif process_ss_obj: # Command had non-zero return code
|
|
print(f" [-] SearchSploit command failed for '{query}'. RC: {process_ss_obj.returncode}. Log: {searchsploit_console_log}")
|
|
else: # process_ss_obj is None (e.g., timeout in run_command before Popen)
|
|
print(f" [-] SearchSploit execution failed to start or timed out for '{query}'. Log: {searchsploit_console_log if os.path.exists(searchsploit_console_log) else 'not created'}")
|
|
|
|
if ss_data: # If ss_data was successfully parsed
|
|
try:
|
|
# Re-parse if it came from log or if direct stdout parsing was reset
|
|
if not isinstance(json_output_to_parse, dict) and not isinstance(json_output_to_parse, list):
|
|
ss_data = json.loads(json_output_to_parse)
|
|
else: # Already parsed if stdout was clean
|
|
ss_data = json_output_to_parse if (isinstance(json_output_to_parse, dict) or isinstance(json_output_to_parse, list)) else json.loads(json_output_to_parse)
|
|
|
|
|
|
ss_results = ss_data.get("RESULTS_EXPLOIT", [])
|
|
# If RESULTS_EXPLOIT is empty, but the root object is a list (older searchsploit versions or different query types?)
|
|
if not ss_results and isinstance(ss_data, list):
|
|
ss_results = ss_data
|
|
# If RESULTS_EXPLOIT is not found, but there's a top-level list of results (e.g. for --cve without matches but other info)
|
|
elif not ss_results and isinstance(ss_data.get("RESULTS_CVE"), list): # Check for CVE results structure
|
|
ss_results = ss_data.get("RESULTS_CVE") # This might not have EDB-ID directly, handle carefully
|
|
|
|
for exploit in ss_results:
|
|
if not isinstance(exploit, dict): continue # Skip non-dict items
|
|
|
|
edb_id = exploit.get("EDB-ID")
|
|
exploit_info = {
|
|
"title": exploit.get("Title"), "path": exploit.get("Path"),
|
|
"EDB-ID": edb_id, "type": exploit.get("Type"),
|
|
"platform": exploit.get("Platform"),
|
|
"date_published": exploit.get("Date"), # Added Date
|
|
"author": exploit.get("Author"), # Added Author
|
|
"mirrored_path": None # Placeholder for mirrored exploit
|
|
}
|
|
found_ss_exploits.append(exploit_info)
|
|
|
|
if edb_id:
|
|
manual_cmd_str = f"searchsploit -m {edb_id}"
|
|
print(f" -> Found EDB-ID {edb_id}: '{exploit.get('Title')}'. Manual mirror: {manual_cmd_str}")
|
|
manual_cmds.append({
|
|
"query": query, "EDB-ID": edb_id,
|
|
"title": exploit.get("Title"), "command": manual_cmd_str
|
|
})
|
|
|
|
# Mirror exploit if enabled
|
|
if enable_mirroring:
|
|
mirror_cmd = ["searchsploit", "-m", edb_id, "--path-only"] # Get path first to avoid clutter
|
|
# We need a way to tell searchsploit *where* to mirror, or it uses default CWD.
|
|
# Searchsploit -m EDBID mirrors to current working directory.
|
|
# We need to run it from the target mirrored_exploits_base_dir or copy after.
|
|
# For simplicity, let's try to get the path and then copy.
|
|
# However, `searchsploit -p EDBID` gives the path *within the searchsploit dir*.
|
|
# `searchsploit -m EDBID` copies it to CWD.
|
|
|
|
# Let's use a temporary CWD for mirroring or handle paths carefully.
|
|
# Simpler: mirror to CWD then move, or mirror directly if searchsploit supports output dir for -m.
|
|
# Searchsploit -m EDB-ID -o /target/dir is NOT a standard option.
|
|
# So, we run `searchsploit -m EDB-ID` and it will download to the CWD of the run_command.
|
|
# We need to ensure run_command's CWD is controllable or files are moved.
|
|
# For now, let's assume run_command executes in a predictable CWD or we handle it post-execution.
|
|
|
|
# This part is tricky with run_command. Let's just log the command.
|
|
# Actual mirroring implementation might need `subprocess.run` with `cwd` argument.
|
|
# For now, we'll just record the intent and path.
|
|
# If searchsploit_handler is enhanced to *return* the mirrored path, that's better.
|
|
|
|
# Placeholder for actual mirroring logic - for now, just note it.
|
|
# In a more advanced version, this would call a helper to mirror and get the new path.
|
|
print(f" Mirroring enabled. Command to mirror EDB-ID {edb_id}: searchsploit -m {edb_id}")
|
|
# exploit_info["mirrored_path"] = "Path/To/Mirrored/Exploit" # Update this if mirrored
|
|
|
|
if found_ss_exploits:
|
|
print(f" [+] SearchSploit: Found {len(found_ss_exploits)} results for '{query}'.")
|
|
else:
|
|
print(f" [-] SearchSploit: No results found for '{query}' in parsed JSON.")
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f" [-] SearchSploit: JSON decode error for '{query}'. Log: '{searchsploit_console_log}'. Error: {e}. Content snippet: {json_output_to_parse[:200]}")
|
|
|
|
elif process_ss_obj and process_ss_obj.returncode == 0 and not json_output_to_parse.strip():
|
|
print(f" [-] SearchSploit: Command successful but no JSON output for '{query}'. Log: {searchsploit_console_log}")
|
|
elif process_ss_obj and hasattr(process_ss_obj, 'returncode'): # Command failed
|
|
print(f" [-] SearchSploit command failed for '{query}'. RC: {process_ss_obj.returncode}. Log: {searchsploit_console_log}")
|
|
else: # Process object None (e.g. timeout by run_command)
|
|
print(f" [-] SearchSploit execution failed to start or timed out for '{query}'. Log: {searchsploit_console_log if os.path.exists(searchsploit_console_log) else 'not created'}")
|
|
|
|
except Exception as e:
|
|
print(f" [-] SearchSploit: General error processing for '{query}': {type(e).__name__} - {e}")
|
|
state.add_tool_error(f"Searchsploit General Error for query '{query}': {e}")
|
|
|
|
return found_ss_exploits, manual_cmds
|