java 单机接口限流处理方案
262
2022-10-10
阅读sqlmap源代码,编写burpsuite插件--sqlmapapi
burpsuite插件编写---sql injection
0x00 概要
在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文传输等。说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。
0x01 为什么sqlmapapi只能检测get请求是否存在注入?
# Start the client or the server if args.server is True: server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password) elif args.client is True: client(args.host, args.port, username=args.username, password=args.password) else: apiparser.print_help()
def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None): """ REST-JSON API client """ DataStore.username = username DataStore.password = password dbgMsg = "Example client access from command line:" dbgMsg += "\n\t$ taskid=$(curl 2>1 | grep -o -I '[a-f0-9]\{16\}') && echo $taskid" % (host, port) dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d '{\"url\": \"% (host, port) dbgMsg += "\n\t$ curl % (host, port) dbgMsg += "\n\t$ curl % (host, port) logger.debug(dbgMsg) addr = "% (host, port) logger.info("Starting REST-JSON API client to '%s'..." % addr) try: _client(addr) except Exception, ex: if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED: errMsg = "There has been a problem while connecting to the " errMsg += "REST-JSON API server at '%s' " % addr errMsg += "(%s)" % ex logger.critical(errMsg) return
我们查看的重点代码是
dbgMsg = "Example client access from command line:" dbgMsg += "\n\t$ taskid=$(curl 2>1 | grep -o -I '[a-f0-9]\{16\}') && echo $taskid" % (host, port) dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d '{\"url\": \"% (host, port) dbgMsg += "\n\t$ curl % (host, port) dbgMsg += "\n\t$ curl % (host, port) logger.debug(dbgMsg)
在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d '{\"url\": \"% (host, port)这一行代码表示开启一个任务扫描,Content-Type: application/json为请求头的部分;开启的具体那个任务;-X POST -d '{\"url\": \"post请求参数,其中只有url,没有关于post请求的data参数,因此sqlmapapi只能进行get请求的sql injection 检测。
0x02继承IHttpListener接口(方法一),存在缺陷
先上源代码:
from burp import IBurpExtender from burp import IHttpListener from java.io import PrintWriter import re import urllib import urllib2 import time import json from threading import Thread import requests class BurpExtender(IBurpExtender, IHttpListener): # #implement IBurpExtender # def registerExtenderCallbacks(self, callbacks): # keep a reference to our callbacks object self._callbacks = callbacks # set our extension name callbacks.setExtensionName("fanyingjie") # obtain our output stream self._stdout = PrintWriter(callbacks.getStdout(), True) self._helpers = callbacks.getHelpers() # register ourselves as an callbacks.registerHttpListener(self) def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo): if(messageIsRequest): a=self._helpers.analyzeRequest(messageInfo) method=a.getMethod() url=str(a.getUrl()) if(("?" in url) and (method=="GET")): self._stdout.println("start") t=AutoSqli(target=url,stdout=self._stdout,method=method) t.run() class AutoSqli(Thread): def __init__(self,target,stdout,method): self.server="http://192.168.159.134:8775" self.taskid = '' self.target=target self.method=method self._stdout=stdout self.start_time = time.time() def task_new(self): self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid'] self._stdout.println('Created new task: ' + self.taskid ) if len(self.taskid) > 0: return True return False def task_delete(self): if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']: self._stdout.println('[%s] Deleted task' % (self.taskid)) return True return False def scan_start(self): headers = {'Content-Type': 'application/json'} payload = {'url':self.target} url = self.server + '/scan/' + self.taskid + '/start' #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text) req=urllib2.Request(url,data=json.dumps(payload),headers=headers) t=json.loads(urllib2.urlopen(req).read()) self._stdout.println("start "+ self.taskid) if len(str(t['engineid'])) > 0 and t['success']: return True return False def scan_status(self): status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status'] if status == 'running': return 'running' if status == 'terminated': return 'terminated' return "error" def scan_data(self): data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data'] if len(data) == 0: self._stdout.println('not injection:\t' + self.target) return False else: self._stdout.println('injection:\t' + self.target) return True def scan_kill(self): json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success'] self._stdout.println("%s kill")%(self.taskid) def scan_stop(self): json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success'] self._stdout.println("%s stop")%(self.taskid) def run(self): try: if not self.task_new(): return False if not self.scan_start(): return False while True: if self.scan_status() == 'running': time.sleep(10) elif self.scan_status() == 'terminated': break else: break #print self.target + ":\t" + str(time.time() - self.start_time) if time.time() - self.start_time > 500: self.scan_stop() self.scan_kill() break self.scan_data() #self.task_delete() except Exception as e: pass
if(messageIsRequest): #当包是请求包时执行sql injection检查 a=self._helpers.analyzeRequest(messageInfo) #这是burp提供的一个函数,可以从请求包中获取到url method header等 method=a.getMethod() url=str(a.getUrl()) if(("?" in url) and (method=="GET")): #当请求是get请求和链接中存在参数时进行sql injection 检查 self._stdout.println("start") t=AutoSqli(target=url,stdout=self._stdout,method=method) t.run()
class AutoSqli(Thread): def __init__(self,target,stdout,method): self.server="http://192.168.159.134:8775"#开启sqlmapapi服务的ip self.taskid = '' self.target=target self.method=method self._stdout=stdout self.start_time = time.time() def task_new(self):#创建一个新的任务,并获取taskid self.taskid = json.loads(urllib2.urlopen(self.server + '/task/new').read())['taskid'] self._stdout.println('Created new task: ' + self.taskid ) if len(self.taskid) > 0: return True return False def task_delete(self):#通过taskid删除某一个任务 if json.loads(urllib2.urlopen(self.server + '/task/' + self.taskid + '/delete').read())['success']: self._stdout.println('[%s] Deleted task' % (self.taskid)) return True return False def scan_start(self):#开始一个扫描,传入需要扫描的地址 headers = {'Content-Type': 'application/json'} payload = {'url':self.target} url = self.server + '/scan/' + self.taskid + '/start' #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text) req=urllib2.Request(url,data=json.dumps(payload),headers=headers) t=json.loads(urllib2.urlopen(req).read()) self._stdout.println("start "+ self.taskid) if len(str(t['engineid'])) > 0 and t['success']: return True return False def scan_status(self):#查看是否扫描完成,通过status判断,terminated是扫描完成 status = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/status').read())['status'] if status == 'running': return 'running' if status == 'terminated': return 'terminated' return "error" def scan_data(self):#获取扫描完成的结果,如果data有值表名存在注入,否则不存在 data = json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/data').read())['data'] if len(data) == 0: self._stdout.println('not injection:\t' + self.target) return False else: self._stdout.println('injection:\t' + self.target) return True def scan_kill(self): json.loads(rurllib2.urlopen(self.server + '/scan/' + self.taskid + '/kill').read())['success'] self._stdout.println("%s kill")%(self.taskid) def scan_stop(self): json.loads(urllib2.urlopen(self.server + '/scan/' + self.taskid + '/stop').read())['success'] self._stdout.println("%s stop")%(self.taskid) def run(self): try: if not self.task_new(): return False if not self.scan_start(): return False while True: if self.scan_status() == 'running': time.sleep(10) elif self.scan_status() == 'terminated': break else: break #print self.target + ":\t" + str(time.time() - self.start_time) if time.time() - self.start_time > 500: self.scan_stop() self.scan_kill() break self.scan_data() #self.task_delete() except Exception as e: pass
在下一篇文章中,我会记录一下第二种方法,可以在不影响测试效率的情况下,依然可以有效的检查出注入;在设置一些sqlmapapi参数的前提下,比较一下sqlmapapi和burpsuite scanner模块那个更有效率。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~