java 单机接口限流处理方案
349
2022-10-10
阅读sqlmap源代码,编写burpsuite插件--sqlmapapi(二)(burpsuite sqlmap插件)
burpsuite插件编写---sql injection
0x00 概要
在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文传输等。说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。本文接上一篇文章,上一文记录到继承IHttpListener接口编写插件,影响测试效率,本文将介绍另一种方式,将sqlmapapi 和burpsuite进行对比
0x01 继承IScannerCheck接口(方法2)
先上插件代码:
> from burp import IBurpExtender > from burp import IScannerCheck > from java.io import PrintWriter > import re > import urllib > import urllib2 > import time > import json > from threading import Thread > import requests > > > > > class BurpExtender(IBurpExtender, IScannerCheck): > > # > #implement IBurpExtender > # > def registerExtenderCallbacks(self, callbacks): > # keep a reference to our callbacks object > self._callbacks = callbacks > > # set our extension name > callbacks.setExtensionName("fanyingjie") > > # obtain our output stream > self._stdout = PrintWriter(callbacks.getStdout(), True) > > self._helpers = callbacks.getHelpers() > > # register ourselves as an > callbacks.registerScannerCheck(self) > > > def doActiveScan(self, baseRequestResponse, insertionPoint): > pass > def doPassiveScan(self, baseRequestResponse): > a=self._helpers.analyzeRequest(baseRequestResponse) > method=a.getMethod() > url=str(a.getUrl()) > if(("?" in url) and (method=="GET")): > self._stdout.println("start") > t=AutoSqli(target=url,stdout=self._stdout,method=method) > t.run() > > def consolidateDuplicateIssues(self, existingIssue, newIssue): > pass > > > > class AutoSqli(Thread): > def __init__(self,target,stdout,method): > self.server="http://192.168.159.134:8775" > self.taskid = '' > self.target=target > self.method=method > self._stdout=stdout > self.start_time = time.time() > > def task_new(self): > self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid'] > self._stdout.println('Created new task: ' + self.taskid ) > if len(self.taskid) > 0: > return True > return False > > def task_delete(self): > if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']: > self._stdout.println('[%s] Deleted task' % (self.taskid)) > return True > return False > > > def scan_start(self): > headers = {'Content-Type': 'application/json'} > payload = {'url':self.target} > url = self.server + '/scan/' + self.taskid + '/start' > #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text) > > req=urllib2.Request(url,data=json.dumps(payload),headers=headers) > t=json.loads(urllib2.urlopen(req).read()) > self._stdout.println("start "+ self.taskid) > > if len(str(t['engineid'])) > 0 and t['success']: > return True > return False > > def scan_status(self): > status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status'] > if status == 'running': > return 'running' > if status == 'terminated': > return 'terminated' > return "error" > > def scan_data(self): > > data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data'] > if len(data) == 0: > self._stdout.println('not injection:\t' + self.target) > return False > else: > self._stdout.println('injection:\t' + self.target) > return True > > def scan_kill(self): > json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success'] > self._stdout.println("%s kill")%(self.taskid) > > > def scan_stop(self): > json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success'] > self._stdout.println("%s stop")%(self.taskid) > > def run(self): > try: > if not self.task_new(): > return False > if not self.scan_start(): > return False > while True: > if self.scan_status() == 'running': > time.sleep(10) > elif self.scan_status() == 'terminated': > break > else: > break > #print self.target + ":\t" + str(time.time() - self.start_time) > if time.time() - self.start_time > 500: > self.scan_stop() > self.scan_kill() > break > self.scan_data() > #self.task_delete() > > except Exception as e: > pass >
使用burp scanner模块和上面写的插件对进行sql注入测试,burp scanner模块可以测试出存在注入,但是插件无法测试出;因此插件进行改写,修改默认配置,在插件中添加以下代码,我只修改了level和risk
def optionSet(self): headers = {'Content-Type': 'application/json'} payload={"level":"5","risk":"3"} url = self.server + '/option/' + self.taskid + '/set' req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("set option " + self.taskid)
然后在新建扫描后,调用配置设置函数,修改后的代码:
from burp import IBurpExtender from burp import IScannerCheck from java.io import PrintWriter import re import urllib import urllib2 import time import json from threading import Thread import requests class BurpExtender(IBurpExtender, IScannerCheck): # # implement IBurpExtender # def registerExtenderCallbacks(self, callbacks): # keep a reference to our callbacks object self._callbacks = callbacks # set our extension name callbacks.setExtensionName("fanyingjie") # obtain our output stream self._stdout = PrintWriter(callbacks.getStdout(), True) self._helpers = callbacks.getHelpers() # register ourselves as an callbacks.registerScannerCheck(self) def doActiveScan(self, baseRequestResponse, insertionPoint): pass def doPassiveScan(self, baseRequestResponse): a = self._helpers.analyzeRequest(baseRequestResponse) method = a.getMethod() url = str(a.getUrl()) if (("?" in url) and (method == "GET")): self._stdout.println("start") t = AutoSqli(target=url, stdout=self._stdout, method=method) t.run() def consolidateDuplicateIssues(self, existingIssue, newIssue): pass class AutoSqli(Thread): def __init__(self, target, stdout, method): self.server = "http://172.16.173.136:8775" self.taskid = '' self.target = target self.method = method self._stdout = stdout self.start_time = time.time() def task_new(self): self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid'] self._stdout.println('Created new task: ' + self.taskid) if len(self.taskid) > 0: return True return False def task_delete(self): if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']: self._stdout.println('[%s] Deleted task' % (self.taskid)) return True return False def optionSet(self): headers = {'Content-Type': 'application/json'} payload={"level":"5","risk":"3"} url = self.server + '/option/' + self.taskid + '/set' req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("set option " + self.taskid) def scan_start(self): headers = {'Content-Type': 'application/json'} payload = {'url': self.target} url = self.server + '/scan/' + self.taskid + '/start' # t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text) req = urllib2.Request(url, data=json.dumps(payload), headers=headers) t = json.loads(urllib2.urlopen(req).read()) self._stdout.println("start " + self.taskid) if len(str(t['engineid'])) > 0 and t['success']: return True return False def scan_status(self): status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status'] if status == 'running': return 'running' if status == 'terminated': return 'terminated' return "error" def scan_data(self): data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data'] if len(data) == 0: self._stdout.println('not injection:\t' + self.target) return False else: self._stdout.println('injection:\t' + self.target) return True def scan_kill(self): json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success'] self._stdout.println("%s kill") % (self.taskid) def scan_stop(self): json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success'] self._stdout.println("%s stop") % (self.taskid) def run(self): try: if not self.task_new(): return False self.optionSet() if not self.scan_start(): return False while True: if self.scan_status() == 'running': time.sleep(10) elif self.scan_status() == 'terminated': break else: break # print self.target + ":\t" + str(time.time() - self.start_time) if time.time() - self.start_time > 500: self.scan_stop() self.scan_kill() break self.scan_data() # self.task_delete() except Exception as e: pass
再次对上面代码进行测试,可以测试出存在注入插件的输出:
startCreated new task: 42f685742c94a985set option 42f685742c94a985start 42f685742c94a985injection: scanner模块:通过修改sqlmapapi默认设置可以提高注入检测的准确率,但是修改默认配置后sqlmapapi不见的比burpsuite scanner模块更厉害,而且sqlmapapi只能检测get请求类型,且不能检测登录后的get请求,因此看来在安全测试过程中,如果目的只是发现网站是否存在sql注入,还是使用burpsuite scanner 模块比较好用;在知道存在sql注入后,可以使用sqlmap来进行后续的操作。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~