mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
251 lines
13 KiB
Python
251 lines
13 KiB
Python
import os
|
|
import requests
|
|
import json
|
|
from .utils import make_api_request # Assuming a utility for API requests might exist or be created
|
|
|
|
class VulnerabilityManager:
|
|
"""
|
|
Manages fetching and providing vulnerability data from various sources.
|
|
"""
|
|
def __init__(self, config, state):
|
|
self.config = config
|
|
self.state = state
|
|
self.wpscan_api_token = self.config.get("wpscan_api_token", None)
|
|
self.wpscan_api_base_url = "https://wpscan.com/api/v3"
|
|
|
|
def _make_wpscan_api_request(self, endpoint, params=None):
|
|
"""Helper to make requests to the WPScan API."""
|
|
if not self.wpscan_api_token:
|
|
print(" [!] WPScan API token not configured. Cannot fetch vulnerability data from WPScan DB.")
|
|
self.state.add_scan_note("WPScan API token not configured; vulnerability correlation from WPScan DB skipped.")
|
|
return None
|
|
|
|
headers = {
|
|
"Authorization": f"Token token={self.wpscan_api_token}",
|
|
"User-Agent": self.config.get("default_user_agent", "WPAudit")
|
|
}
|
|
|
|
# Using a generic make_api_request or direct requests.get
|
|
# For now, let's assume direct requests.get for simplicity, can be refactored to use a shared utility
|
|
full_url = f"{self.wpscan_api_base_url}/{endpoint}"
|
|
try:
|
|
# print(f" [DBG] Making WPScan API request to: {full_url} with params: {params}")
|
|
response = requests.get(full_url, headers=headers, params=params, timeout=self.config.get("requests_timeout", 15))
|
|
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
|
|
|
|
# Check if response is empty or not JSON
|
|
if not response.content:
|
|
print(f" [?] WPScan API returned an empty response for {endpoint}.")
|
|
return None
|
|
try:
|
|
return response.json()
|
|
except json.JSONDecodeError:
|
|
print(f" [-] WPScan API returned non-JSON response for {endpoint}. Content: {response.text[:200]}")
|
|
return None
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
print(f" [-] WPScan API Error: Unauthorized (401). Check your API token for endpoint {endpoint}.")
|
|
self.state.add_scan_warning(f"WPScan API Unauthorized for {endpoint}. Check token.")
|
|
elif e.response.status_code == 404:
|
|
print(f" [i] WPScan API: Endpoint or resource not found (404) for {endpoint} with params {params}.")
|
|
elif e.response.status_code == 429:
|
|
print(f" [!] WPScan API Error: Too Many Requests (429) for endpoint {endpoint}. Rate limit likely exceeded.")
|
|
self.state.add_scan_warning("WPScan API rate limit exceeded.")
|
|
else:
|
|
print(f" [-] WPScan API HTTP Error for {endpoint}: {e}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" [-] WPScan API Request Error for {endpoint}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f" [-] Unexpected error during WPScan API request for {endpoint}: {e}")
|
|
return None
|
|
|
|
def get_core_vulnerabilities(self, version):
|
|
"""
|
|
Fetches vulnerabilities for a given WordPress core version.
|
|
Example WPScan API endpoint: /wordpresses/{slug} where slug is version like '592' for 5.9.2
|
|
"""
|
|
if not version:
|
|
return []
|
|
|
|
# Convert version like "6.7.2" to "672" for WPScan API slug
|
|
version_slug = version.replace(".", "")
|
|
endpoint = f"wordpresses/{version_slug}"
|
|
|
|
print(f" Querying WPScan API for WordPress core version: {version} (slug: {version_slug})")
|
|
data = self._make_wpscan_api_request(endpoint)
|
|
|
|
if data and isinstance(data, dict): # Expecting a dictionary for the specific version
|
|
# The vulnerabilities are usually in a 'vulnerabilities' key within the main version data
|
|
raw_vulnerabilities = data.get("vulnerabilities", [])
|
|
formatted_vulnerabilities = []
|
|
for vuln in raw_vulnerabilities:
|
|
# Basic formatting, can be expanded
|
|
formatted_vulnerabilities.append({
|
|
"title": vuln.get("title"),
|
|
"references": vuln.get("references", {}),
|
|
"fixed_in": vuln.get("fixed_in"),
|
|
"published_date": vuln.get("published_date"),
|
|
"cve": vuln.get("cve"), # If available
|
|
"source": "WPScan API"
|
|
# Add severity if WPScan API provides it directly or if we can map it
|
|
})
|
|
return formatted_vulnerabilities
|
|
elif data and isinstance(data, list): # Should not happen for this specific endpoint
|
|
print(f" [?] WPScan API returned a list for core version {version_slug}, expected a dict. Data: {str(data)[:200]}")
|
|
return []
|
|
|
|
return []
|
|
|
|
def get_plugin_vulnerabilities(self, plugin_slug, version=None):
|
|
"""
|
|
Fetches vulnerabilities and metadata for a given plugin slug and optional version.
|
|
WPScan API: /plugins/{slug}
|
|
Returns a dict: {"metadata": {}, "vulnerabilities": []}
|
|
"""
|
|
if not plugin_slug:
|
|
return {"metadata": {}, "vulnerabilities": []}
|
|
|
|
endpoint = f"plugins/{plugin_slug}"
|
|
print(f" Querying WPScan API for plugin: {plugin_slug}")
|
|
# The API response for a plugin is a dictionary where the key is the slug
|
|
# and the value is another dictionary containing 'latest_version', 'last_updated', 'vulnerabilities', etc.
|
|
response_data = self._make_wpscan_api_request(endpoint)
|
|
|
|
# The actual plugin data is nested under its slug as a key in the response
|
|
plugin_data = response_data.get(plugin_slug) if response_data and isinstance(response_data, dict) else None
|
|
|
|
if plugin_data and isinstance(plugin_data, dict):
|
|
metadata = {
|
|
"slug": plugin_slug,
|
|
"latest_version": plugin_data.get("latest_version"),
|
|
"last_updated": plugin_data.get("last_updated"),
|
|
"description": plugin_data.get("description"), # May not be available directly
|
|
"homepage": plugin_data.get("homepage"), # May not be available directly
|
|
"authors": plugin_data.get("authors", []), # May not be available directly
|
|
"popularity": plugin_data.get("popularity"),
|
|
"source": "WPScan API"
|
|
}
|
|
|
|
raw_vulnerabilities = plugin_data.get("vulnerabilities", [])
|
|
relevant_vulnerabilities = []
|
|
for vuln in raw_vulnerabilities:
|
|
is_relevant = True
|
|
if version and vuln.get("fixed_in"):
|
|
try:
|
|
from packaging.version import parse as parse_version
|
|
if parse_version(version) >= parse_version(vuln["fixed_in"]):
|
|
is_relevant = False
|
|
except:
|
|
pass
|
|
|
|
if is_relevant:
|
|
relevant_vulnerabilities.append({
|
|
"title": vuln.get("title"),
|
|
"references": vuln.get("references", {}),
|
|
"fixed_in": vuln.get("fixed_in"),
|
|
"published_date": vuln.get("published_date"),
|
|
"cve": vuln.get("cve"),
|
|
"source": "WPScan API"
|
|
})
|
|
return {"metadata": metadata, "vulnerabilities": relevant_vulnerabilities}
|
|
return {"metadata": {}, "vulnerabilities": []}
|
|
|
|
def get_theme_vulnerabilities(self, theme_slug, version=None):
|
|
"""
|
|
Fetches vulnerabilities and metadata for a given theme slug and optional version.
|
|
WPScan API: /themes/{slug}
|
|
Returns a dict: {"metadata": {}, "vulnerabilities": []}
|
|
"""
|
|
if not theme_slug:
|
|
return {"metadata": {}, "vulnerabilities": []}
|
|
|
|
endpoint = f"themes/{theme_slug}"
|
|
print(f" Querying WPScan API for theme: {theme_slug}")
|
|
response_data = self._make_wpscan_api_request(endpoint)
|
|
|
|
theme_data = response_data.get(theme_slug) if response_data and isinstance(response_data, dict) else None
|
|
|
|
if theme_data and isinstance(theme_data, dict):
|
|
metadata = {
|
|
"slug": theme_slug,
|
|
"latest_version": theme_data.get("latest_version"),
|
|
"last_updated": theme_data.get("last_updated"),
|
|
"description": theme_data.get("description"),
|
|
"homepage": theme_data.get("homepage"),
|
|
"authors": theme_data.get("authors", []),
|
|
"popularity": theme_data.get("popularity"),
|
|
"source": "WPScan API"
|
|
}
|
|
raw_vulnerabilities = theme_data.get("vulnerabilities", [])
|
|
relevant_vulnerabilities = []
|
|
for vuln in raw_vulnerabilities:
|
|
is_relevant = True
|
|
if version and vuln.get("fixed_in"):
|
|
try:
|
|
from packaging.version import parse as parse_version
|
|
if parse_version(version) >= parse_version(vuln["fixed_in"]):
|
|
is_relevant = False
|
|
except:
|
|
pass
|
|
|
|
if is_relevant:
|
|
relevant_vulnerabilities.append({
|
|
"title": vuln.get("title"),
|
|
"references": vuln.get("references", {}),
|
|
"fixed_in": vuln.get("fixed_in"),
|
|
"published_date": vuln.get("published_date"),
|
|
"cve": vuln.get("cve"),
|
|
"source": "WPScan API"
|
|
})
|
|
return {"metadata": metadata, "vulnerabilities": relevant_vulnerabilities}
|
|
return {"metadata": {}, "vulnerabilities": []}
|
|
|
|
# Example Usage (for testing purposes, not part of the class)
|
|
if __name__ == '__main__':
|
|
# Mock config and state for testing
|
|
mock_config = {
|
|
"wpscan_api_token": os.environ.get("WPSCAN_API_TOKEN"), # Load from env for testing
|
|
"requests_timeout": 10,
|
|
"default_user_agent": "WPAuditTestClient"
|
|
}
|
|
mock_state = type('MockState', (), {'add_scan_note': print, 'add_scan_warning': print})()
|
|
|
|
if not mock_config["wpscan_api_token"]:
|
|
print("WPSCAN_API_TOKEN environment variable not set. Skipping live API test.")
|
|
else:
|
|
vm = VulnerabilityManager(mock_config, mock_state)
|
|
|
|
# Test core vulnerabilities
|
|
core_version_to_test = "5.8.3" # An older version likely to have vulns
|
|
print(f"\nTesting Core Vulnerabilities for WP {core_version_to_test}...")
|
|
core_vulns = vm.get_core_vulnerabilities(core_version_to_test)
|
|
if core_vulns:
|
|
print(f"Found {len(core_vulns)} vulnerabilities for WP {core_version_to_test}:")
|
|
for v in core_vulns[:2]: print(f" - {v['title']}")
|
|
else:
|
|
print(f"No vulnerabilities found or error for WP {core_version_to_test}.")
|
|
|
|
# Test plugin vulnerabilities (example: a plugin known to have vulns)
|
|
plugin_slug_to_test = "akismet" # A very common plugin
|
|
plugin_version_to_test = "4.1" # An older version
|
|
print(f"\nTesting Plugin Vulnerabilities for {plugin_slug_to_test} v{plugin_version_to_test}...")
|
|
plugin_vulns = vm.get_plugin_vulnerabilities(plugin_slug_to_test, plugin_version_to_test)
|
|
if plugin_vulns:
|
|
print(f"Found {len(plugin_vulns)} vulnerabilities for {plugin_slug_to_test} (potentially affecting v{plugin_version_to_test}):")
|
|
for v in plugin_vulns[:2]: print(f" - {v['title']} (Fixed in: {v.get('fixed_in', 'N/A')})")
|
|
else:
|
|
print(f"No vulnerabilities found or error for {plugin_slug_to_test}.")
|
|
|
|
# Test theme vulnerabilities (example: twentyfifteen)
|
|
theme_slug_to_test = "twentyfifteen"
|
|
theme_version_to_test = "1.8" # An older version
|
|
print(f"\nTesting Theme Vulnerabilities for {theme_slug_to_test} v{theme_version_to_test}...")
|
|
theme_vulns = vm.get_theme_vulnerabilities(theme_slug_to_test, theme_version_to_test)
|
|
if theme_vulns:
|
|
print(f"Found {len(theme_vulns)} vulnerabilities for {theme_slug_to_test} (potentially affecting v{theme_version_to_test}):")
|
|
for v in theme_vulns[:2]: print(f" - {v['title']} (Fixed in: {v.get('fixed_in', 'N/A')})")
|
|
else:
|
|
print(f"No vulnerabilities found or error for {theme_slug_to_test}.")
|