wpaudit/modules/exploit_intel/query_builder.py
2025-05-22 12:01:10 +05:00

229 lines
12 KiB
Python

import re
def add_exploit_query_item(queries_list, item_type, name, version=None, cves=None):
"""
Adds structured query dictionaries to the provided list based on item details.
"""
# Clean name/version slightly for search terms
clean_name = re.sub(r'[^\w\s.-]', '', str(name)) if name else ''
clean_version = re.sub(r'[^\w\d.-]', '', str(version)) if version else ''
query_string_base = ""
if clean_name:
query_string_base = f"{clean_name}" # Type will be part of the dict
if clean_version: query_string_base += f" {clean_version}"
# Add a query for the name/version combination
queries_list.append({
"type": item_type, # e.g., "WordPress Core", "WordPress Plugin", "Software Component"
"name": clean_name,
"version": clean_version if clean_version else None,
"query_string": query_string_base.strip(), # Full search string for tools like searchsploit
"cve_id": None # Not a CVE-specific query
})
if cves:
for cve_id_str in cves:
if isinstance(cve_id_str, str) and re.match(r"CVE-\d{4}-\d{4,7}", cve_id_str, re.IGNORECASE):
# Add a specific query for each CVE
queries_list.append({
"type": "CVE", # Specific type for CVEs
"name": None, # Name not relevant for direct CVE query
"version": None, # Version not relevant for direct CVE query
"query_string": cve_id_str.upper(), # Searchsploit uses CVE ID directly
"cve_id": cve_id_str.upper()
})
def build_search_queries(state):
"""
Builds a list of structured search query dictionaries based on findings from other modules.
Ensures uniqueness of queries.
"""
raw_queries_list = [] # List to hold potentially duplicate query dicts
# From WPScan
wps_data = state.get_module_findings("wpscan_results", {}).get("data")
if wps_data:
if wps_data.get("version"):
vulns = wps_data.get("version", {}).get("vulnerabilities", [])
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
add_exploit_query_item(raw_queries_list, "WordPress Core", wps_data.get("version",{}).get("number"), cves=cves)
if wps_data.get("main_theme"):
vulns = wps_data.get("main_theme", {}).get("vulnerabilities", [])
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
add_exploit_query_item(raw_queries_list, "WordPress Theme", wps_data.get("main_theme",{}).get("slug"), wps_data.get("main_theme",{}).get("version",{}).get("number"), cves=cves)
if wps_data.get("plugins"):
for slug, p_info in wps_data.get("plugins", {}).items():
vulns = p_info.get("vulnerabilities", [])
cves = [ref for v in vulns for r_type, refs in v.get("references", {}).items() if r_type == "cve" for ref in refs]
add_exploit_query_item(raw_queries_list, "WordPress Plugin", slug, p_info.get("version",{}).get("number"), cves=cves)
# From Nuclei
nuclei_findings = state.get_module_findings("nuclei_results", {}).get("findings", [])
for finding in nuclei_findings:
info = finding.get("info", {})
# Add CVEs from Nuclei
cve_ids_raw = info.get("cve-id")
if cve_ids_raw:
cves_to_add = cve_ids_raw if isinstance(cve_ids_raw, list) else [cve_ids_raw]
add_exploit_query_item(raw_queries_list, "CVE", None, cves=cves_to_add) # Name is None for pure CVE search
# Add product/technology from Nuclei tags or classification if available
# This helps create queries like "Apache Struts 2.3.x"
tags = info.get("tags", [])
if isinstance(tags, str): tags = [tags] # Ensure tags is a list
tech_from_tags = None
for tag in tags:
if tag in ['apache', 'nginx', 'php', 'mysql', 'joomla', 'drupal', 'tomcat', 'iis', 'struts', 'jenkins', 'oracle', 'mssql', 'postgresql', 'mongodb', 'redis', 'memcached', 'elasticsearch', 'kubernetes', 'docker', 'spring']: # Common technologies
tech_from_tags = tag
break
# Check classification (less common in standard Nuclei templates but good practice)
classification = info.get("classification", {})
product_from_classification = classification.get("product")
vendor_from_classification = classification.get("vendor")
search_item_name = None
search_item_type = "Software Component"
if product_from_classification:
search_item_name = f"{vendor_from_classification} {product_from_classification}" if vendor_from_classification else product_from_classification
elif tech_from_tags:
search_item_name = tech_from_tags
if search_item_name:
# Try to get a version from the Nuclei finding name or description if it's very specific
# This is heuristic. Example: "Apache Struts 2.3.37 RCE"
version_from_name = None
finding_name_lower = info.get("name", "").lower()
version_match = re.search(r'(\d+(\.\d+){1,3})', finding_name_lower) # Matches x.y, x.y.z, x.y.z.a
if version_match and search_item_name.lower() in finding_name_lower: # Ensure version is related to the item
version_from_name = version_match.group(1)
add_exploit_query_item(raw_queries_list, search_item_type, search_item_name.strip(), version=version_from_name)
# Use template ID for specific, non-generic vulnerabilities
template_id = finding.get("template-id", "")
# Refined exclusion list for template IDs
excluded_template_keywords = [
"generic-", "detect", "version", "http-missing", "exposed-", "default-credentials",
"tech-detect", "info-leak", "misconfig", "http-request", "favicon", "waf-detect",
"wordpress-login", "wordpress-users", "wordpress-xmlrpc", "wordpress-config-backup" # WP specific but often informational
]
if template_id and not any(kw in template_id.lower() for kw in excluded_template_keywords):
# If template name is more descriptive, prefer it
template_query_name = info.get("name", template_id)
add_exploit_query_item(raw_queries_list, "Vulnerability Pattern", template_query_name)
# From Nmap Service Scans
nmap_results = state.get_module_findings("nmap_results", {})
open_ports_data = nmap_results.get("open_ports", [])
for port_info in open_ports_data:
product = port_info.get("product")
version = port_info.get("version")
if product: # Only add if product is identified
# Clean up common extra info from product/version fields
product_clean = product.split(' ')[0] # e.g., "Apache httpd" -> "Apache"
if "httpd" in product.lower() and "apache" not in product.lower(): product_clean = "Apache " + product_clean
version_clean = version.split(' ')[0] if version else None # Take first part of version string
if product_clean and len(product_clean) > 2: # Avoid overly short product names
add_exploit_query_item(raw_queries_list, "Service", product_clean, version_clean)
# Deduplicate the list of dictionaries
# A common way is to convert dicts to a hashable form (tuple of items) for a set, then back to dicts
deduplicated_queries = []
seen_query_strings = set() # Primarily deduplicate based on the main query_string
for query_dict in raw_queries_list:
# Use query_string for primary deduplication, but consider type for CVEs vs general software
# For CVEs, the cve_id itself is the unique part. For others, query_string.
unique_key = query_dict["query_string"]
if query_dict["type"] == "CVE" and query_dict["cve_id"]:
unique_key = query_dict["cve_id"] # CVE ID is the most unique identifier
if unique_key not in seen_query_strings:
deduplicated_queries.append(query_dict)
seen_query_strings.add(unique_key)
# Optionally sort the final list of dictionaries, e.g., by type then query_string
# Sorting a list of dicts requires a key function.
# For now, the order from first encounter after deduplication is fine.
# If sorting is needed:
# return sorted(deduplicated_queries, key=lambda d: (d.get('type', ''), d.get('query_string', '')))
# --- Add queries from VulnerabilityManager findings (wp_analyzer) ---
wp_analyzer_findings = state.get_module_findings("wp_analyzer", {})
# Core Vulnerabilities
core_vuln_data = wp_analyzer_findings.get("core_vulnerabilities", {})
if core_vuln_data.get("detected_version") and core_vuln_data.get("potential_vulnerabilities"):
core_version = core_vuln_data["detected_version"]
for vuln in core_vuln_data["potential_vulnerabilities"]:
cves = []
if vuln.get("cve"): # WPScan API directly provides 'cve'
cves.append(vuln["cve"])
# Sometimes references might contain more CVEs, though less common for WPScan direct cve field
for ref_type, ref_list in vuln.get("references", {}).items():
if ref_type == "cve":
cves.extend(ref_list)
# Query by specific vulnerability title if no CVE, or in addition to CVE
# Using "WordPress Core" as item_type, and version. Title becomes part of the query_string.
query_name = f"WordPress Core {vuln.get('title', '')}".strip()
add_exploit_query_item(raw_queries_list, "WordPress Core Vulnerability", query_name, version=core_version, cves=list(set(cves)))
# Extension Vulnerabilities (Themes & Plugins)
ext_vuln_data = wp_analyzer_findings.get("extension_vulnerabilities", {})
# Themes
if ext_vuln_data.get("vulnerable_themes"):
for theme_detail in ext_vuln_data["vulnerable_themes"]:
theme_name = theme_detail["name"]
theme_version = theme_detail.get("version")
for vuln in theme_detail.get("vulnerabilities", []):
cves = []
if vuln.get("cve"):
cves.append(vuln["cve"])
for ref_type, ref_list in vuln.get("references", {}).items():
if ref_type == "cve":
cves.extend(ref_list)
query_name = f"Theme {theme_name} {vuln.get('title', '')}".strip()
add_exploit_query_item(raw_queries_list, "WordPress Theme Vulnerability", query_name, version=theme_version, cves=list(set(cves)))
# Plugins
if ext_vuln_data.get("vulnerable_plugins"):
for plugin_detail in ext_vuln_data["vulnerable_plugins"]:
plugin_name = plugin_detail["name"]
plugin_version = plugin_detail.get("version")
for vuln in plugin_detail.get("vulnerabilities", []):
cves = []
if vuln.get("cve"):
cves.append(vuln["cve"])
for ref_type, ref_list in vuln.get("references", {}).items():
if ref_type == "cve":
cves.extend(ref_list)
query_name = f"Plugin {plugin_name} {vuln.get('title', '')}".strip()
add_exploit_query_item(raw_queries_list, "WordPress Plugin Vulnerability", query_name, version=plugin_version, cves=list(set(cves)))
# Re-deduplicate after adding new queries
final_deduplicated_queries = []
seen_query_keys_final = set()
# Combine original deduplicated_queries with newly added ones from wp_analyzer, then deduplicate all
combined_raw_queries = deduplicated_queries + raw_queries_list # Add new ones to potentially already processed ones
for query_dict in combined_raw_queries:
unique_key = query_dict["query_string"]
if query_dict["type"] == "CVE" and query_dict.get("cve_id"): # Ensure cve_id exists
unique_key = query_dict["cve_id"]
if unique_key not in seen_query_keys_final:
final_deduplicated_queries.append(query_dict)
seen_query_keys_final.add(unique_key)
return final_deduplicated_queries