mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
229 lines
12 KiB
Python
229 lines
12 KiB
Python
import re
|
|
|
|
def add_exploit_query_item(queries_list, item_type, name, version=None, cves=None):
|
|
"""
|
|
Adds structured query dictionaries to the provided list based on item details.
|
|
"""
|
|
# Clean name/version slightly for search terms
|
|
clean_name = re.sub(r'[^\w\s.-]', '', str(name)) if name else ''
|
|
clean_version = re.sub(r'[^\w\d.-]', '', str(version)) if version else ''
|
|
|
|
query_string_base = ""
|
|
if clean_name:
|
|
query_string_base = f"{clean_name}" # Type will be part of the dict
|
|
if clean_version: query_string_base += f" {clean_version}"
|
|
|
|
# Add a query for the name/version combination
|
|
queries_list.append({
|
|
"type": item_type, # e.g., "WordPress Core", "WordPress Plugin", "Software Component"
|
|
"name": clean_name,
|
|
"version": clean_version if clean_version else None,
|
|
"query_string": query_string_base.strip(), # Full search string for tools like searchsploit
|
|
"cve_id": None # Not a CVE-specific query
|
|
})
|
|
|
|
if cves:
|
|
for cve_id_str in cves:
|
|
if isinstance(cve_id_str, str) and re.match(r"CVE-\d{4}-\d{4,7}", cve_id_str, re.IGNORECASE):
|
|
# Add a specific query for each CVE
|
|
queries_list.append({
|
|
"type": "CVE", # Specific type for CVEs
|
|
"name": None, # Name not relevant for direct CVE query
|
|
"version": None, # Version not relevant for direct CVE query
|
|
"query_string": cve_id_str.upper(), # Searchsploit uses CVE ID directly
|
|
"cve_id": cve_id_str.upper()
|
|
})
|
|
|
|
def build_search_queries(state):
|
|
"""
|
|
Builds a list of structured search query dictionaries based on findings from other modules.
|
|
Ensures uniqueness of queries.
|
|
"""
|
|
raw_queries_list = [] # List to hold potentially duplicate query dicts
|
|
|
|
# From WPScan
|
|
wps_data = state.get_module_findings("wpscan_results", {}).get("data")
|
|
if wps_data:
|
|
if wps_data.get("version"):
|
|
vulns = wps_data.get("version", {}).get("vulnerabilities", [])
|
|
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
|
|
add_exploit_query_item(raw_queries_list, "WordPress Core", wps_data.get("version",{}).get("number"), cves=cves)
|
|
if wps_data.get("main_theme"):
|
|
vulns = wps_data.get("main_theme", {}).get("vulnerabilities", [])
|
|
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
|
|
add_exploit_query_item(raw_queries_list, "WordPress Theme", wps_data.get("main_theme",{}).get("slug"), wps_data.get("main_theme",{}).get("version",{}).get("number"), cves=cves)
|
|
if wps_data.get("plugins"):
|
|
for slug, p_info in wps_data.get("plugins", {}).items():
|
|
vulns = p_info.get("vulnerabilities", [])
|
|
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
|
|
add_exploit_query_item(raw_queries_list, "WordPress Plugin", slug, p_info.get("version",{}).get("number"), cves=cves)
|
|
|
|
# From Nuclei
|
|
nuclei_findings = state.get_module_findings("nuclei_results", {}).get("findings", [])
|
|
for finding in nuclei_findings:
|
|
info = finding.get("info", {})
|
|
|
|
# Add CVEs from Nuclei
|
|
cve_ids_raw = info.get("cve-id")
|
|
if cve_ids_raw:
|
|
cves_to_add = cve_ids_raw if isinstance(cve_ids_raw, list) else [cve_ids_raw]
|
|
add_exploit_query_item(raw_queries_list, "CVE", None, cves=cves_to_add) # Name is None for pure CVE search
|
|
|
|
# Add product/technology from Nuclei tags or classification if available
|
|
# This helps create queries like "Apache Struts 2.3.x"
|
|
tags = info.get("tags", [])
|
|
if isinstance(tags, str): tags = [tags] # Ensure tags is a list
|
|
|
|
tech_from_tags = None
|
|
for tag in tags:
|
|
if tag in ['apache', 'nginx', 'php', 'mysql', 'joomla', 'drupal', 'tomcat', 'iis', 'struts', 'jenkins', 'oracle', 'mssql', 'postgresql', 'mongodb', 'redis', 'memcached', 'elasticsearch', 'kubernetes', 'docker', 'spring']: # Common technologies
|
|
tech_from_tags = tag
|
|
break
|
|
|
|
# Check classification (less common in standard Nuclei templates but good practice)
|
|
classification = info.get("classification", {})
|
|
product_from_classification = classification.get("product")
|
|
vendor_from_classification = classification.get("vendor")
|
|
|
|
search_item_name = None
|
|
search_item_type = "Software Component"
|
|
if product_from_classification:
|
|
search_item_name = f"{vendor_from_classification} {product_from_classification}" if vendor_from_classification else product_from_classification
|
|
elif tech_from_tags:
|
|
search_item_name = tech_from_tags
|
|
|
|
if search_item_name:
|
|
# Try to get a version from the Nuclei finding name or description if it's very specific
|
|
# This is heuristic. Example: "Apache Struts 2.3.37 RCE"
|
|
version_from_name = None
|
|
finding_name_lower = info.get("name", "").lower()
|
|
version_match = re.search(r'(\d+(\.\d+){1,3})', finding_name_lower) # Matches x.y, x.y.z, x.y.z.a
|
|
if version_match and search_item_name.lower() in finding_name_lower: # Ensure version is related to the item
|
|
version_from_name = version_match.group(1)
|
|
add_exploit_query_item(raw_queries_list, search_item_type, search_item_name.strip(), version=version_from_name)
|
|
|
|
# Use template ID for specific, non-generic vulnerabilities
|
|
template_id = finding.get("template-id", "")
|
|
# Refined exclusion list for template IDs
|
|
excluded_template_keywords = [
|
|
"generic-", "detect", "version", "http-missing", "exposed-", "default-credentials",
|
|
"tech-detect", "info-leak", "misconfig", "http-request", "favicon", "waf-detect",
|
|
"wordpress-login", "wordpress-users", "wordpress-xmlrpc", "wordpress-config-backup" # WP specific but often informational
|
|
]
|
|
if template_id and not any(kw in template_id.lower() for kw in excluded_template_keywords):
|
|
# If template name is more descriptive, prefer it
|
|
template_query_name = info.get("name", template_id)
|
|
add_exploit_query_item(raw_queries_list, "Vulnerability Pattern", template_query_name)
|
|
|
|
|
|
# From Nmap Service Scans
|
|
nmap_results = state.get_module_findings("nmap_results", {})
|
|
open_ports_data = nmap_results.get("open_ports", [])
|
|
for port_info in open_ports_data:
|
|
product = port_info.get("product")
|
|
version = port_info.get("version")
|
|
if product: # Only add if product is identified
|
|
# Clean up common extra info from product/version fields
|
|
product_clean = product.split(' ')[0] # e.g., "Apache httpd" -> "Apache"
|
|
if "httpd" in product.lower() and "apache" not in product.lower(): product_clean = "Apache " + product_clean
|
|
|
|
version_clean = version.split(' ')[0] if version else None # Take first part of version string
|
|
if product_clean and len(product_clean) > 2: # Avoid overly short product names
|
|
add_exploit_query_item(raw_queries_list, "Service", product_clean, version_clean)
|
|
|
|
# Deduplicate the list of dictionaries
|
|
# A common way is to convert dicts to a hashable form (tuple of items) for a set, then back to dicts
|
|
deduplicated_queries = []
|
|
seen_query_strings = set() # Primarily deduplicate based on the main query_string
|
|
|
|
for query_dict in raw_queries_list:
|
|
# Use query_string for primary deduplication, but consider type for CVEs vs general software
|
|
# For CVEs, the cve_id itself is the unique part. For others, query_string.
|
|
unique_key = query_dict["query_string"]
|
|
if query_dict["type"] == "CVE" and query_dict["cve_id"]:
|
|
unique_key = query_dict["cve_id"] # CVE ID is the most unique identifier
|
|
|
|
if unique_key not in seen_query_strings:
|
|
deduplicated_queries.append(query_dict)
|
|
seen_query_strings.add(unique_key)
|
|
|
|
# Optionally sort the final list of dictionaries, e.g., by type then query_string
|
|
# Sorting a list of dicts requires a key function.
|
|
# For now, the order from first encounter after deduplication is fine.
|
|
# If sorting is needed:
|
|
# return sorted(deduplicated_queries, key=lambda d: (d.get('type', ''), d.get('query_string', '')))
|
|
|
|
|
|
# --- Add queries from VulnerabilityManager findings (wp_analyzer) ---
|
|
wp_analyzer_findings = state.get_module_findings("wp_analyzer", {})
|
|
|
|
# Core Vulnerabilities
|
|
core_vuln_data = wp_analyzer_findings.get("core_vulnerabilities", {})
|
|
if core_vuln_data.get("detected_version") and core_vuln_data.get("potential_vulnerabilities"):
|
|
core_version = core_vuln_data["detected_version"]
|
|
for vuln in core_vuln_data["potential_vulnerabilities"]:
|
|
cves = []
|
|
if vuln.get("cve"): # WPScan API directly provides 'cve'
|
|
cves.append(vuln["cve"])
|
|
# Sometimes references might contain more CVEs, though less common for WPScan direct cve field
|
|
for ref_type, ref_list in vuln.get("references", {}).items():
|
|
if ref_type == "cve":
|
|
cves.extend(ref_list)
|
|
|
|
# Query by specific vulnerability title if no CVE, or in addition to CVE
|
|
# Using "WordPress Core" as item_type, and version. Title becomes part of the query_string.
|
|
query_name = f"WordPress Core {vuln.get('title', '')}".strip()
|
|
add_exploit_query_item(raw_queries_list, "WordPress Core Vulnerability", query_name, version=core_version, cves=list(set(cves)))
|
|
|
|
# Extension Vulnerabilities (Themes & Plugins)
|
|
ext_vuln_data = wp_analyzer_findings.get("extension_vulnerabilities", {})
|
|
|
|
# Themes
|
|
if ext_vuln_data.get("vulnerable_themes"):
|
|
for theme_detail in ext_vuln_data["vulnerable_themes"]:
|
|
theme_name = theme_detail["name"]
|
|
theme_version = theme_detail.get("version")
|
|
for vuln in theme_detail.get("vulnerabilities", []):
|
|
cves = []
|
|
if vuln.get("cve"):
|
|
cves.append(vuln["cve"])
|
|
for ref_type, ref_list in vuln.get("references", {}).items():
|
|
if ref_type == "cve":
|
|
cves.extend(ref_list)
|
|
|
|
query_name = f"Theme {theme_name} {vuln.get('title', '')}".strip()
|
|
add_exploit_query_item(raw_queries_list, "WordPress Theme Vulnerability", query_name, version=theme_version, cves=list(set(cves)))
|
|
|
|
# Plugins
|
|
if ext_vuln_data.get("vulnerable_plugins"):
|
|
for plugin_detail in ext_vuln_data["vulnerable_plugins"]:
|
|
plugin_name = plugin_detail["name"]
|
|
plugin_version = plugin_detail.get("version")
|
|
for vuln in plugin_detail.get("vulnerabilities", []):
|
|
cves = []
|
|
if vuln.get("cve"):
|
|
cves.append(vuln["cve"])
|
|
for ref_type, ref_list in vuln.get("references", {}).items():
|
|
if ref_type == "cve":
|
|
cves.extend(ref_list)
|
|
|
|
query_name = f"Plugin {plugin_name} {vuln.get('title', '')}".strip()
|
|
add_exploit_query_item(raw_queries_list, "WordPress Plugin Vulnerability", query_name, version=plugin_version, cves=list(set(cves)))
|
|
|
|
# Re-deduplicate after adding new queries
|
|
final_deduplicated_queries = []
|
|
seen_query_keys_final = set()
|
|
|
|
# Combine original deduplicated_queries with newly added ones from wp_analyzer, then deduplicate all
|
|
combined_raw_queries = deduplicated_queries + raw_queries_list # Add new ones to potentially already processed ones
|
|
|
|
for query_dict in combined_raw_queries:
|
|
unique_key = query_dict["query_string"]
|
|
if query_dict["type"] == "CVE" and query_dict.get("cve_id"): # Ensure cve_id exists
|
|
unique_key = query_dict["cve_id"]
|
|
|
|
if unique_key not in seen_query_keys_final:
|
|
final_deduplicated_queries.append(query_dict)
|
|
seen_query_keys_final.add(unique_key)
|
|
|
|
return final_deduplicated_queries
|