mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
233 lines
11 KiB
Python
233 lines
11 KiB
Python
import orjson # Changed from json
|
|
from datetime import datetime
|
|
import os
|
|
import threading
|
|
import shutil # For backing up state file
|
|
import copy # For deepcopy in get_full_state
|
|
|
|
class ScanState:
|
|
"""
|
|
Manages the state of the scan, holding findings and metadata.
|
|
Uses a lock for basic thread safety on modifications.
|
|
"""
|
|
def __init__(self, target_info: dict, config_used: dict):
|
|
self._lock = threading.Lock()
|
|
self._state = {
|
|
"scan_metadata": {
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": None,
|
|
"target_info": target_info,
|
|
"config_used": config_used,
|
|
"report_file_prefix": self._generate_prefix(target_info, config_used)
|
|
},
|
|
"tool_checks": {},
|
|
"phases_executed": [],
|
|
"findings": {}, # Main findings grouped by module/tool
|
|
"summary_points": [],
|
|
"critical_alerts": [],
|
|
"remediation_suggestions": {}, # Key: unique finding ID, Value: dict with details
|
|
"tool_errors": []
|
|
}
|
|
# Ensure 'settings' exists in config_used or provide a default dict
|
|
settings = config_used.get('settings', {})
|
|
self.output_dir = settings.get('output_dir', 'wpaudit_reports') # Updated default
|
|
self._ensure_output_dir()
|
|
|
|
|
|
def _generate_prefix(self, target_info, config_used):
|
|
"""Generates the unique file prefix for this scan run."""
|
|
settings = config_used.get('settings', {}) # Ensure 'settings' exists
|
|
output_dir = settings.get('output_dir', 'wpaudit_reports') # Updated default
|
|
report_prefix = settings.get('report_prefix', 'wpaudit_report') # Updated default
|
|
hostname = target_info.get("sanitized_hostname", "unknown_target")
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return os.path.join(output_dir, f"{report_prefix}_{hostname}_{timestamp}")
|
|
|
|
def _ensure_output_dir(self):
|
|
"""Creates the output directory if it doesn't exist."""
|
|
if not os.path.exists(self.output_dir):
|
|
try:
|
|
os.makedirs(self.output_dir)
|
|
print(f"[+] Created output directory: {self.output_dir}")
|
|
except OSError as e:
|
|
print(f"[!!!] CRITICAL ERROR: Could not create output directory '{self.output_dir}': {e}")
|
|
# Depending on severity, you might want to exit or just log the error
|
|
# sys.exit(1) # Uncomment to exit if directory creation fails
|
|
|
|
|
|
def get_report_file_prefix(self):
|
|
"""Gets the base file path prefix for reports and logs for this run."""
|
|
with self._lock:
|
|
return self._state["scan_metadata"]["report_file_prefix"]
|
|
|
|
def mark_phase_executed(self, phase_name: str):
|
|
"""Records that a phase has been executed."""
|
|
with self._lock:
|
|
if phase_name not in self._state["phases_executed"]:
|
|
self._state["phases_executed"].append(phase_name)
|
|
|
|
def add_tool_check_result(self, tool_key: str, result: dict):
|
|
"""Adds the result of a tool availability check."""
|
|
with self._lock:
|
|
self._state["tool_checks"][tool_key] = result
|
|
|
|
def add_finding(self, module_key: str, data: dict):
|
|
"""Adds findings for a specific module/tool."""
|
|
with self._lock:
|
|
# Initialize the key if it doesn't exist
|
|
if module_key not in self._state["findings"]:
|
|
self._state["findings"][module_key] = {}
|
|
# Append or update data - depends on how module data is structured
|
|
# Simple update for now, might need merging logic later
|
|
self._state["findings"][module_key].update(data)
|
|
|
|
def update_module_findings(self, module_key: str, data: any):
|
|
""" Overwrites or sets the findings data for a module. """
|
|
with self._lock:
|
|
self._state["findings"][module_key] = data
|
|
|
|
def get_module_findings(self, module_key: str, default=None):
|
|
"""Retrieves findings for a specific module."""
|
|
with self._lock:
|
|
return self._state["findings"].get(module_key, default)
|
|
|
|
def get_specific_finding(self, module_key: str, path_keys: list, default=None):
|
|
"""
|
|
Retrieves a specific nested finding from a module's findings.
|
|
:param module_key: The key for the module (e.g., 'wp_analyzer').
|
|
:param path_keys: A list of keys representing the path to the nested finding
|
|
(e.g., ['configuration_audit', 'wp_debug_check', 'status']).
|
|
:param default: The value to return if the path is not found.
|
|
:return: The specific finding or the default value.
|
|
"""
|
|
with self._lock:
|
|
module_data = self._state["findings"].get(module_key)
|
|
if module_data is None:
|
|
return default
|
|
|
|
current_level = module_data
|
|
for key in path_keys:
|
|
if isinstance(current_level, dict) and key in current_level:
|
|
current_level = current_level[key]
|
|
else:
|
|
return default
|
|
return current_level
|
|
|
|
def update_specific_finding(self, module_key: str, path_keys: list, value: any):
|
|
"""
|
|
Updates or sets a specific nested finding within a module's findings.
|
|
Creates intermediate dictionaries if they don't exist.
|
|
:param module_key: The key for the module.
|
|
:param path_keys: A list of keys representing the path to the nested finding.
|
|
:param value: The value to set for the specific finding.
|
|
"""
|
|
with self._lock:
|
|
if module_key not in self._state["findings"]:
|
|
self._state["findings"][module_key] = {}
|
|
|
|
current_level = self._state["findings"][module_key]
|
|
|
|
for i, key in enumerate(path_keys):
|
|
if i == len(path_keys) - 1: # Last key, set the value
|
|
current_level[key] = value
|
|
else: # Not the last key, ensure path exists
|
|
if key not in current_level or not isinstance(current_level[key], dict):
|
|
current_level[key] = {} # Create dict if not exists or not a dict
|
|
current_level = current_level[key]
|
|
|
|
def add_summary_point(self, message: str):
|
|
"""Adds a high-level summary point."""
|
|
with self._lock:
|
|
self._state["summary_points"].append(message)
|
|
|
|
def add_critical_alert(self, message: str):
|
|
"""Adds a critical alert message."""
|
|
with self._lock:
|
|
self._state["critical_alerts"].append(message)
|
|
|
|
def add_remediation_suggestion(self, finding_id: str, details: dict):
|
|
"""Adds a remediation suggestion linked to a finding ID."""
|
|
with self._lock:
|
|
# Ensure details has required fields like description, severity, remediation
|
|
if "description" not in details or "severity" not in details:
|
|
print(f"[!] Warning: Remediation suggestion for '{finding_id}' missing required fields.")
|
|
self._state["remediation_suggestions"][finding_id] = details
|
|
|
|
def add_tool_error(self, message: str):
|
|
"""Logs an error encountered while running a tool."""
|
|
with self._lock:
|
|
self._state["tool_errors"].append(message)
|
|
|
|
def finalize_scan(self):
|
|
"""Marks the scan end time."""
|
|
with self._lock:
|
|
self._state["scan_metadata"]["end_time"] = datetime.now().isoformat()
|
|
|
|
def get_full_state(self):
|
|
"""Returns a copy of the entire state dictionary."""
|
|
with self._lock:
|
|
# Return a deep copy to prevent external modification
|
|
return copy.deepcopy(self._state)
|
|
|
|
def save_state(self):
|
|
"""Saves the current state to the primary JSON report file using orjson."""
|
|
filepath = f"{self.get_report_file_prefix()}_FULL_REPORT.json"
|
|
backup_filepath = f"{filepath}.bak"
|
|
current_state_data = self.get_full_state() # Get a thread-safe copy
|
|
|
|
# Create backup of existing state file
|
|
if os.path.exists(filepath):
|
|
try:
|
|
shutil.copy2(filepath, backup_filepath)
|
|
# print(f"[i] Backed up existing state file to: {backup_filepath}") # Optional: verbose logging
|
|
except Exception as e_backup:
|
|
print(f"[!] Warning: Could not create backup of state file '{filepath}': {e_backup}")
|
|
|
|
try:
|
|
# orjson expects bytes, so we encode to UTF-8.
|
|
# OPT_INDENT_2 provides pretty printing.
|
|
# default is used for objects orjson can't serialize directly (e.g. datetime if not handled)
|
|
# However, our datetime objects are already ISO strings.
|
|
json_bytes = orjson.dumps(current_state_data, option=orjson.OPT_INDENT_2)
|
|
with open(filepath, 'wb') as f: # Open in binary mode for orjson
|
|
f.write(json_bytes)
|
|
# print(f"[+] Scan state saved: {filepath}") # Optional: verbose logging
|
|
except Exception as e:
|
|
print(f"[!!!] Error saving scan state to '{filepath}' using orjson: {e}")
|
|
# Attempt to restore from backup if save failed
|
|
if os.path.exists(backup_filepath):
|
|
try:
|
|
shutil.copy2(backup_filepath, filepath)
|
|
print(f"[i] Restored state file from backup: {backup_filepath}")
|
|
except Exception as e_restore:
|
|
print(f"[!!!] CRITICAL: Failed to restore state file from backup '{backup_filepath}': {e_restore}")
|
|
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Mock config and target info
|
|
# Ensure 'settings' key exists as per new expectation in _generate_prefix and __init__
|
|
mock_config_used = {
|
|
"settings": {
|
|
"output_dir": "test_state_reports",
|
|
"report_prefix": "test_scan"
|
|
}
|
|
}
|
|
mock_target_info = {"url": "http://example.com", "hostname": "example.com", "ip": "93.184.216.34", "sanitized_hostname": "example_com"}
|
|
|
|
state = ScanState(mock_target_info, mock_config_used)
|
|
state.mark_phase_executed("preflight")
|
|
state.add_tool_check_result("nmap", {"status": "Found", "path": "/usr/bin/nmap"})
|
|
state.add_finding("preflight", {"robots_txt_info": {"status": "Found", "disallowed_paths": ["/admin"]}})
|
|
state.add_critical_alert("Found admin directory in robots.txt")
|
|
state.add_remediation_suggestion("robots_admin", {"description": "Admin directory exposed in robots.txt", "severity": "low", "remediation": "Consider removing sensitive paths from robots.txt"})
|
|
state.finalize_scan()
|
|
state.save_state()
|
|
|
|
print("\n--- Example Scan State (using orjson for potential binary output in real use) ---")
|
|
# For printing, we might still use standard json if orjson output is bytes
|
|
try:
|
|
print(orjson.dumps(state.get_full_state(), option=orjson.OPT_INDENT_2).decode())
|
|
except Exception: # Fallback if orjson specific options cause issues with print
|
|
import json
|
|
print(json.dumps(state.get_full_state(), indent=2, default=str))
|