wpaudit/core/vulnerability_manager.py
2025-05-22 12:16:58 +05:00

251 lines
13 KiB
Python

import os
import requests
import json
from .utils import make_api_request # Assuming a utility for API requests might exist or be created
class VulnerabilityManager:
"""
Manages fetching and providing vulnerability data from various sources.
"""
def __init__(self, config, state):
self.config = config
self.state = state
self.wpscan_api_token = self.config.get("wpscan_api_token", None)
self.wpscan_api_base_url = "https://wpscan.com/api/v3"
def _make_wpscan_api_request(self, endpoint, params=None):
"""Helper to make requests to the WPScan API."""
if not self.wpscan_api_token:
print(" [!] WPScan API token not configured. Cannot fetch vulnerability data from WPScan DB.")
self.state.add_scan_note("WPScan API token not configured; vulnerability correlation from WPScan DB skipped.")
return None
headers = {
"Authorization": f"Token token={self.wpscan_api_token}",
"User-Agent": self.config.get("default_user_agent", "WPAudit")
}
# Using a generic make_api_request or direct requests.get
# For now, let's assume direct requests.get for simplicity, can be refactored to use a shared utility
full_url = f"{self.wpscan_api_base_url}/{endpoint}"
try:
# print(f" [DBG] Making WPScan API request to: {full_url} with params: {params}")
response = requests.get(full_url, headers=headers, params=params, timeout=self.config.get("requests_timeout", 15))
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
# Check if response is empty or not JSON
if not response.content:
print(f" [?] WPScan API returned an empty response for {endpoint}.")
return None
try:
return response.json()
except json.JSONDecodeError:
print(f" [-] WPScan API returned non-JSON response for {endpoint}. Content: {response.text[:200]}")
return None
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print(f" [-] WPScan API Error: Unauthorized (401). Check your API token for endpoint {endpoint}.")
self.state.add_scan_warning(f"WPScan API Unauthorized for {endpoint}. Check token.")
elif e.response.status_code == 404:
print(f" [i] WPScan API: Endpoint or resource not found (404) for {endpoint} with params {params}.")
elif e.response.status_code == 429:
print(f" [!] WPScan API Error: Too Many Requests (429) for endpoint {endpoint}. Rate limit likely exceeded.")
self.state.add_scan_warning("WPScan API rate limit exceeded.")
else:
print(f" [-] WPScan API HTTP Error for {endpoint}: {e}")
return None
except requests.exceptions.RequestException as e:
print(f" [-] WPScan API Request Error for {endpoint}: {e}")
return None
except Exception as e:
print(f" [-] Unexpected error during WPScan API request for {endpoint}: {e}")
return None
def get_core_vulnerabilities(self, version):
"""
Fetches vulnerabilities for a given WordPress core version.
Example WPScan API endpoint: /wordpresses/{slug} where slug is version like '592' for 5.9.2
"""
if not version:
return []
# Convert version like "6.7.2" to "672" for WPScan API slug
version_slug = version.replace(".", "")
endpoint = f"wordpresses/{version_slug}"
print(f" Querying WPScan API for WordPress core version: {version} (slug: {version_slug})")
data = self._make_wpscan_api_request(endpoint)
if data and isinstance(data, dict): # Expecting a dictionary for the specific version
# The vulnerabilities are usually in a 'vulnerabilities' key within the main version data
raw_vulnerabilities = data.get("vulnerabilities", [])
formatted_vulnerabilities = []
for vuln in raw_vulnerabilities:
# Basic formatting, can be expanded
formatted_vulnerabilities.append({
"title": vuln.get("title"),
"references": vuln.get("references", {}),
"fixed_in": vuln.get("fixed_in"),
"published_date": vuln.get("published_date"),
"cve": vuln.get("cve"), # If available
"source": "WPScan API"
# Add severity if WPScan API provides it directly or if we can map it
})
return formatted_vulnerabilities
elif data and isinstance(data, list): # Should not happen for this specific endpoint
print(f" [?] WPScan API returned a list for core version {version_slug}, expected a dict. Data: {str(data)[:200]}")
return []
return []
def get_plugin_vulnerabilities(self, plugin_slug, version=None):
"""
Fetches vulnerabilities and metadata for a given plugin slug and optional version.
WPScan API: /plugins/{slug}
Returns a dict: {"metadata": {}, "vulnerabilities": []}
"""
if not plugin_slug:
return {"metadata": {}, "vulnerabilities": []}
endpoint = f"plugins/{plugin_slug}"
print(f" Querying WPScan API for plugin: {plugin_slug}")
# The API response for a plugin is a dictionary where the key is the slug
# and the value is another dictionary containing 'latest_version', 'last_updated', 'vulnerabilities', etc.
response_data = self._make_wpscan_api_request(endpoint)
# The actual plugin data is nested under its slug as a key in the response
plugin_data = response_data.get(plugin_slug) if response_data and isinstance(response_data, dict) else None
if plugin_data and isinstance(plugin_data, dict):
metadata = {
"slug": plugin_slug,
"latest_version": plugin_data.get("latest_version"),
"last_updated": plugin_data.get("last_updated"),
"description": plugin_data.get("description"), # May not be available directly
"homepage": plugin_data.get("homepage"), # May not be available directly
"authors": plugin_data.get("authors", []), # May not be available directly
"popularity": plugin_data.get("popularity"),
"source": "WPScan API"
}
raw_vulnerabilities = plugin_data.get("vulnerabilities", [])
relevant_vulnerabilities = []
for vuln in raw_vulnerabilities:
is_relevant = True
if version and vuln.get("fixed_in"):
try:
from packaging.version import parse as parse_version
if parse_version(version) >= parse_version(vuln["fixed_in"]):
is_relevant = False
except:
pass
if is_relevant:
relevant_vulnerabilities.append({
"title": vuln.get("title"),
"references": vuln.get("references", {}),
"fixed_in": vuln.get("fixed_in"),
"published_date": vuln.get("published_date"),
"cve": vuln.get("cve"),
"source": "WPScan API"
})
return {"metadata": metadata, "vulnerabilities": relevant_vulnerabilities}
return {"metadata": {}, "vulnerabilities": []}
def get_theme_vulnerabilities(self, theme_slug, version=None):
"""
Fetches vulnerabilities and metadata for a given theme slug and optional version.
WPScan API: /themes/{slug}
Returns a dict: {"metadata": {}, "vulnerabilities": []}
"""
if not theme_slug:
return {"metadata": {}, "vulnerabilities": []}
endpoint = f"themes/{theme_slug}"
print(f" Querying WPScan API for theme: {theme_slug}")
response_data = self._make_wpscan_api_request(endpoint)
theme_data = response_data.get(theme_slug) if response_data and isinstance(response_data, dict) else None
if theme_data and isinstance(theme_data, dict):
metadata = {
"slug": theme_slug,
"latest_version": theme_data.get("latest_version"),
"last_updated": theme_data.get("last_updated"),
"description": theme_data.get("description"),
"homepage": theme_data.get("homepage"),
"authors": theme_data.get("authors", []),
"popularity": theme_data.get("popularity"),
"source": "WPScan API"
}
raw_vulnerabilities = theme_data.get("vulnerabilities", [])
relevant_vulnerabilities = []
for vuln in raw_vulnerabilities:
is_relevant = True
if version and vuln.get("fixed_in"):
try:
from packaging.version import parse as parse_version
if parse_version(version) >= parse_version(vuln["fixed_in"]):
is_relevant = False
except:
pass
if is_relevant:
relevant_vulnerabilities.append({
"title": vuln.get("title"),
"references": vuln.get("references", {}),
"fixed_in": vuln.get("fixed_in"),
"published_date": vuln.get("published_date"),
"cve": vuln.get("cve"),
"source": "WPScan API"
})
return {"metadata": metadata, "vulnerabilities": relevant_vulnerabilities}
return {"metadata": {}, "vulnerabilities": []}
# Example Usage (for testing purposes, not part of the class)
if __name__ == '__main__':
# Mock config and state for testing
mock_config = {
"wpscan_api_token": os.environ.get("WPSCAN_API_TOKEN"), # Load from env for testing
"requests_timeout": 10,
"default_user_agent": "WPAuditTestClient"
}
mock_state = type('MockState', (), {'add_scan_note': print, 'add_scan_warning': print})()
if not mock_config["wpscan_api_token"]:
print("WPSCAN_API_TOKEN environment variable not set. Skipping live API test.")
else:
vm = VulnerabilityManager(mock_config, mock_state)
# Test core vulnerabilities
core_version_to_test = "5.8.3" # An older version likely to have vulns
print(f"\nTesting Core Vulnerabilities for WP {core_version_to_test}...")
core_vulns = vm.get_core_vulnerabilities(core_version_to_test)
if core_vulns:
print(f"Found {len(core_vulns)} vulnerabilities for WP {core_version_to_test}:")
for v in core_vulns[:2]: print(f" - {v['title']}")
else:
print(f"No vulnerabilities found or error for WP {core_version_to_test}.")
# Test plugin vulnerabilities (example: a plugin known to have vulns)
plugin_slug_to_test = "akismet" # A very common plugin
plugin_version_to_test = "4.1" # An older version
print(f"\nTesting Plugin Vulnerabilities for {plugin_slug_to_test} v{plugin_version_to_test}...")
plugin_vulns = vm.get_plugin_vulnerabilities(plugin_slug_to_test, plugin_version_to_test)
if plugin_vulns:
print(f"Found {len(plugin_vulns)} vulnerabilities for {plugin_slug_to_test} (potentially affecting v{plugin_version_to_test}):")
for v in plugin_vulns[:2]: print(f" - {v['title']} (Fixed in: {v.get('fixed_in', 'N/A')})")
else:
print(f"No vulnerabilities found or error for {plugin_slug_to_test}.")
# Test theme vulnerabilities (example: twentyfifteen)
theme_slug_to_test = "twentyfifteen"
theme_version_to_test = "1.8" # An older version
print(f"\nTesting Theme Vulnerabilities for {theme_slug_to_test} v{theme_version_to_test}...")
theme_vulns = vm.get_theme_vulnerabilities(theme_slug_to_test, theme_version_to_test)
if theme_vulns:
print(f"Found {len(theme_vulns)} vulnerabilities for {theme_slug_to_test} (potentially affecting v{theme_version_to_test}):")
for v in theme_vulns[:2]: print(f" - {v['title']} (Fixed in: {v.get('fixed_in', 'N/A')})")
else:
print(f"No vulnerabilities found or error for {theme_slug_to_test}.")