mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
273 lines
16 KiB
Python
273 lines
16 KiB
Python
# Module for Advanced WordPress User Enumeration Techniques
|
|
import requests # Retained for context
|
|
from urllib.parse import quote, urljoin, urlparse, parse_qs # Added parse_qs
|
|
from .utils import make_request
|
|
import re
|
|
import json
|
|
import copy # Added for deepcopy
|
|
|
|
from bs4 import BeautifulSoup
|
|
# Removed duplicate: from core.utils import make_request
|
|
|
|
def _find_first_post_url(target_url, config):
|
|
"""Helper to find a likely first post URL for oEmbed checks."""
|
|
print(" Attempting to find a valid post URL for oEmbed check...")
|
|
try:
|
|
response = make_request(target_url, config, method="GET", timeout=7)
|
|
if response and response.text:
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
# Look for oEmbed link first
|
|
oembed_link = soup.find('link', attrs={'type': 'application/json+oembed', 'href': True})
|
|
if oembed_link and oembed_link['href']:
|
|
# Extract the URL from the oEmbed href attribute's 'url' query parameter
|
|
parsed_oembed_href = urlparse(oembed_link['href'])
|
|
query_params = parse_qs(parsed_oembed_href.query) # Corrected: use imported parse_qs
|
|
if 'url' in query_params and query_params['url']:
|
|
post_url = query_params['url'][0]
|
|
print(f" Found post URL via oEmbed discovery: {post_url}")
|
|
return post_url
|
|
|
|
# Fallback: Look for common article links
|
|
# This is a very basic heuristic, might need refinement
|
|
for tag_name in ['article', 'div', 'main']: # Common containers for posts
|
|
container = soup.find(tag_name, class_=re.compile(r'(post|entry|article|content)'))
|
|
if container:
|
|
link_tag = container.find('a', href=True)
|
|
if link_tag and link_tag['href'].startswith(('http://', 'https://', '/')):
|
|
post_url = urljoin(target_url, link_tag['href'])
|
|
# Basic validation: ensure it's not just the homepage or an image
|
|
if urlparse(post_url).path not in ['/', ''] and not post_url.endswith(('.png', '.jpg', '.jpeg', '.gif')):
|
|
print(f" Found potential post URL via page parsing: {post_url}")
|
|
return post_url
|
|
|
|
# Fallback to a common default if nothing else found
|
|
common_post_slug = "/hello-world/"
|
|
print(f" Could not find a specific post URL, trying default: {common_post_slug}")
|
|
return urljoin(target_url, common_post_slug)
|
|
|
|
except Exception as e:
|
|
print(f" Error finding post URL: {e}")
|
|
return None
|
|
|
|
|
|
def analyze_advanced_user_enum(state, config, target_url):
|
|
"""
|
|
Performs advanced user enumeration techniques:
|
|
- Author archive scanning (/?author=N)
|
|
- oEmbed API user disclosure
|
|
- JSON REST API /wp/v2/users endpoint
|
|
Login error analysis remains a placeholder.
|
|
"""
|
|
module_key = "wp_analyzer"
|
|
findings_key = "advanced_user_enum"
|
|
|
|
_DEFAULT_ADV_USER_ENUM_FINDINGS = {
|
|
"status": "Not Run",
|
|
"details": "Performing advanced user enumeration techniques.",
|
|
"author_archive_users": [],
|
|
"oembed_disclosed_authors": [],
|
|
"rest_api_users": [],
|
|
"login_error_users": [], # Placeholder
|
|
"all_discovered_usernames_combined": []
|
|
}
|
|
|
|
all_wp_analyzer_findings_raw = state.get_module_findings(module_key, {})
|
|
if not isinstance(all_wp_analyzer_findings_raw, dict):
|
|
all_wp_analyzer_findings = {}
|
|
else:
|
|
all_wp_analyzer_findings = all_wp_analyzer_findings_raw
|
|
|
|
existing_findings = all_wp_analyzer_findings.get(findings_key, {})
|
|
|
|
findings = copy.deepcopy(_DEFAULT_ADV_USER_ENUM_FINDINGS)
|
|
|
|
if isinstance(existing_findings, dict):
|
|
for key, value in existing_findings.items():
|
|
if key in findings: # Only update if key is part of the default structure
|
|
if isinstance(findings[key], list) and isinstance(value, list):
|
|
# For lists, extend while maintaining uniqueness if desired, or simply overwrite/extend
|
|
# Here, we'll extend and let downstream logic handle uniqueness if needed for combined list
|
|
findings[key].extend(v for v in value if v not in findings[key])
|
|
elif isinstance(findings[key], dict) and isinstance(value, dict):
|
|
findings[key].update(value)
|
|
else:
|
|
findings[key] = value # Overwrite for simple types
|
|
|
|
findings["status"] = "Running" # Always set status for the current run
|
|
|
|
all_wp_analyzer_findings[findings_key] = findings
|
|
state.update_module_findings(module_key, all_wp_analyzer_findings) # Save initial state
|
|
|
|
all_found_usernames = set(findings.get("all_discovered_usernames_combined", [])) # Initialize from potentially existing state
|
|
|
|
# 1. Author Archive Enumeration (/?author=N)
|
|
print(" [i] Attempting user enumeration via author archives (/?author=N)...")
|
|
author_archive_found_users_slugs = []
|
|
max_author_id = config.get("wp_analyzer", {}).get("max_author_enum_id", 15)
|
|
print(f" Checking author IDs from 1 to {max_author_id}")
|
|
base_url_parsed = urlparse(target_url)
|
|
base_site_url = f"{base_url_parsed.scheme}://{base_url_parsed.netloc}"
|
|
|
|
for i in range(1, max_author_id + 1):
|
|
author_url = urljoin(target_url, f'?author={i}')
|
|
# print(f" Checking {author_url}") # Can be verbose
|
|
try:
|
|
response = make_request(author_url, config, method="GET", allow_redirects=True, timeout=7)
|
|
if response:
|
|
final_url = response.url
|
|
if final_url != base_site_url and final_url != target_url.rstrip('/') + '/':
|
|
match = re.search(r'/author/([^/]+)/?', final_url)
|
|
if match:
|
|
username_slug = match.group(1)
|
|
if username_slug not in author_archive_found_users_slugs:
|
|
author_archive_found_users_slugs.append(username_slug)
|
|
all_found_usernames.add(username_slug)
|
|
print(f" [+] Author Archive: Found username '{username_slug}' (ID: {i}) via redirect to {final_url}")
|
|
elif response.status_code == 200 and final_url == author_url:
|
|
print(f" [?] Author Archive: ID {i} resolved directly ({author_url}), might indicate user but no slug extracted.")
|
|
# else: print(f" [-] Author Archive: Request failed for author ID {i}.")
|
|
except requests.exceptions.RequestException: # More generic exception
|
|
# print(f" [-] Author Archive: Error checking author ID {i}: {e}")
|
|
if i > 5 and not author_archive_found_users_slugs:
|
|
print(" [-] Author Archive: Stopping enumeration due to early request errors and no users found.")
|
|
break
|
|
findings["author_archive_users"] = author_archive_found_users_slugs
|
|
if author_archive_found_users_slugs:
|
|
print(f" [+] Author Archive: Found {len(author_archive_found_users_slugs)} potential username(s).")
|
|
|
|
|
|
# 2. oEmbed User Enumeration
|
|
print(" [i] Attempting user enumeration via oEmbed API...")
|
|
oembed_users = []
|
|
first_post_url_for_oembed = _find_first_post_url(target_url, config)
|
|
if first_post_url_for_oembed:
|
|
oembed_api_url = urljoin(target_url, f'/wp-json/oembed/1.0/embed?url={quote(first_post_url_for_oembed)}')
|
|
print(f" Checking oEmbed endpoint: {oembed_api_url}")
|
|
try:
|
|
response = make_request(oembed_api_url, config, method="GET", timeout=7)
|
|
if response and response.status_code == 200 and response.text:
|
|
try:
|
|
data = json.loads(response.text)
|
|
author_name = data.get("author_name")
|
|
author_url_str = data.get("author_url")
|
|
if author_name:
|
|
# Try to extract username from author_url if it follows common pattern
|
|
username_from_url = None
|
|
if author_url_str:
|
|
url_match = re.search(r'/author/([^/]+)/?', author_url_str)
|
|
if url_match:
|
|
username_from_url = url_match.group(1)
|
|
|
|
user_info = {"display_name": author_name}
|
|
if username_from_url:
|
|
user_info["username_slug"] = username_from_url
|
|
all_found_usernames.add(username_from_url)
|
|
|
|
oembed_users.append(user_info)
|
|
print(f" [+] oEmbed: Found author '{author_name}'" + (f" (slug: '{username_from_url}')" if username_from_url else ""))
|
|
except json.JSONDecodeError:
|
|
print(" [-] oEmbed: Failed to parse JSON response.")
|
|
elif response:
|
|
print(f" [-] oEmbed: Request failed or non-200 status: {response.status_code}")
|
|
# else: print(" [-] oEmbed: Request failed.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" [-] oEmbed: Error checking endpoint: {e}")
|
|
else:
|
|
print(" [i] oEmbed: Could not determine a post URL to test, skipping oEmbed check.")
|
|
findings["oembed_disclosed_authors"] = oembed_users
|
|
if oembed_users:
|
|
print(f" [+] oEmbed: Found {len(oembed_users)} author(s)/username(s).")
|
|
|
|
|
|
# 3. JSON REST API User Enumeration (/wp-json/wp/v2/users)
|
|
print(" [i] Attempting user enumeration via JSON REST API (/wp-json/wp/v2/users)...")
|
|
rest_api_found_users = []
|
|
# First, check if the main /users endpoint is accessible
|
|
users_api_url = urljoin(target_url, '/wp-json/wp/v2/users')
|
|
try:
|
|
response = make_request(users_api_url, config, method="GET", timeout=7)
|
|
if response and response.status_code == 200 and response.text:
|
|
print(f" [+] REST API: /wp-json/wp/v2/users endpoint is accessible. Parsing users...")
|
|
try:
|
|
users_data = json.loads(response.text)
|
|
if isinstance(users_data, list):
|
|
for user_entry in users_data:
|
|
if isinstance(user_entry, dict) and "slug" in user_entry and "name" in user_entry:
|
|
username_slug = user_entry["slug"]
|
|
display_name = user_entry["name"]
|
|
user_id = user_entry.get("id", "N/A")
|
|
rest_api_found_users.append({"id": user_id, "slug": username_slug, "name": display_name})
|
|
all_found_usernames.add(username_slug)
|
|
print(f" [+] REST API: Found user: ID={user_id}, Slug='{username_slug}', Name='{display_name}'")
|
|
except json.JSONDecodeError:
|
|
print(" [-] REST API: Failed to parse JSON from /wp/v2/users.")
|
|
elif response and response.status_code == 401: # Unauthorized
|
|
print(" [i] REST API: /wp-json/wp/v2/users endpoint requires authentication (401). Trying individual ID enumeration...")
|
|
elif response and response.status_code == 403: # Forbidden
|
|
print(" [i] REST API: /wp-json/wp/v2/users endpoint is forbidden (403). Trying individual ID enumeration...")
|
|
elif response: # Other status codes
|
|
print(f" [-] REST API: /wp-json/wp/v2/users endpoint returned status {response.status_code}. Trying individual ID enumeration...")
|
|
# else: print(f" [-] REST API: Request to /wp-json/wp/v2/users failed.")
|
|
|
|
# If listing failed or was restricted, try enumerating by ID
|
|
if not rest_api_found_users or (response and response.status_code in [401, 403]):
|
|
max_rest_api_user_id = config.get("wp_analyzer", {}).get("max_rest_api_user_enum_id", 15)
|
|
print(f" Attempting REST API user enumeration by ID (1 to {max_rest_api_user_id})...")
|
|
for i in range(1, max_rest_api_user_id + 1):
|
|
user_id_url = urljoin(target_url, f'/wp-json/wp/v2/users/{i}')
|
|
try:
|
|
id_response = make_request(user_id_url, config, method="GET", timeout=5)
|
|
if id_response and id_response.status_code == 200 and id_response.text:
|
|
user_data = json.loads(id_response.text)
|
|
if isinstance(user_data, dict) and "slug" in user_data and "name" in user_data:
|
|
username_slug = user_data["slug"]
|
|
display_name = user_data["name"]
|
|
# Avoid duplicates if already found by full listing
|
|
if not any(u['id'] == i for u in rest_api_found_users):
|
|
rest_api_found_users.append({"id": i, "slug": username_slug, "name": display_name})
|
|
all_found_usernames.add(username_slug)
|
|
print(f" [+] REST API (ID): Found user: ID={i}, Slug='{username_slug}', Name='{display_name}'")
|
|
except (requests.exceptions.RequestException, json.JSONDecodeError):
|
|
continue # Silently continue on errors for individual ID checks
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" [-] REST API: Error accessing /wp-json/wp/v2/users: {e}")
|
|
findings["rest_api_users"] = rest_api_found_users
|
|
if rest_api_found_users:
|
|
print(f" [+] REST API: Found {len(rest_api_found_users)} user(s)/username(s).")
|
|
|
|
|
|
# Finalize details and remediation
|
|
findings["status"] = "Completed"
|
|
details_parts = []
|
|
if findings["author_archive_users"]:
|
|
details_parts.append(f"{len(findings['author_archive_users'])} user(s) via author archives.")
|
|
if findings["oembed_disclosed_authors"]:
|
|
details_parts.append(f"{len(findings['oembed_disclosed_authors'])} author(s) via oEmbed.")
|
|
if findings["rest_api_users"]:
|
|
details_parts.append(f"{len(findings['rest_api_users'])} user(s) via REST API.")
|
|
|
|
if not details_parts:
|
|
findings["details"] = "No users explicitly enumerated via advanced techniques. Login error analysis not implemented."
|
|
else:
|
|
findings["details"] = " ".join(details_parts) + " Login error analysis not implemented."
|
|
|
|
unique_usernames_list = sorted(list(all_found_usernames))
|
|
if unique_usernames_list:
|
|
findings["all_discovered_usernames_combined"] = unique_usernames_list # Add combined list
|
|
state.add_remediation_suggestion("user_enum_advanced_techniques", {
|
|
"source": "WP Analyzer (Advanced User Enum)",
|
|
"description": f"Usernames potentially enumerated via various advanced techniques: {', '.join(unique_usernames_list)}. Methods tried: Author Archives, oEmbed, REST API.",
|
|
"severity": "Medium", # Severity can be context-dependent
|
|
"remediation": "Review WordPress configurations to restrict user information exposure via REST API and oEmbed if not needed for public functionality. Consider disabling author archives or using plugins to prevent enumeration if this is a concern. Ensure strong, unique passwords for all users."
|
|
})
|
|
# Update the global user list in state if such a concept exists
|
|
# current_known_users = state.get_full_state().get("discovered_entities", {}).get("usernames", [])
|
|
# for un in unique_usernames_list:
|
|
# if un not in current_known_users: current_known_users.append(un)
|
|
# state.update_discovered_entities("usernames", current_known_users)
|
|
|
|
|
|
print(f" [+] Advanced user enumeration finished. Combined unique usernames found: {len(unique_usernames_list)}.")
|
|
all_wp_analyzer_findings = state.get_module_findings(module_key, {}) # Re-fetch
|
|
all_wp_analyzer_findings[findings_key] = findings
|
|
state.update_module_findings(module_key, all_wp_analyzer_findings)
|