OpCode使用N-gram归一化实践

网友投稿 289 2022-10-07


OpCode使用N-gram归一化实践

静态OpCode实践聚类和数据分析思路,OpCode基础上拓展其它的特征minhash-共享代码(重要)

|--> 1. 利用IDA反汇编样本集-提取OpCode(带有注释和垃圾指令)。

IDA批量反汇编(python脚本):

import sys import os import datetime # idal -c -A -S//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc inputfile # idalPath = "//usr//local//src//ida-pro-6.4//idal" idalPath = "E:\\Tools\\IDA_Old_Version\\1\\1\\idaw.exe" # idcPath = "//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc" idcPath = "E:\\Tools\\IDA_Old_Version\\1\\1\\idc\\analysis_fullname.idc" # PATH = './/resource//vxheaven//class//virus.win//compress//compress/' normalPath = "E:\\TestVirusAsm" # unpackPath = "" logName = datetime.datetime.now().strftime('%Y%m%d%H%M%S') logPath = ".log" def genAsm(filepath, total): ExecStr = "E:\\Tools\\IDA_Old_Version\\1\\1\\idaw.exe" + " -c -A -S" + "E:\\Tools\\IDA_Old_Version\\1\\1\\idc\\analysis_fullname.idc " + filepath # print ExecStr os.system(ExecStr) # clear idb return total + 1 def traveseFile(path, initClean=False): for parent, dirnames, filenames in os.walk(path): if(initClean): log('Cleaning', '', '[-]') for filename in filenames: filepath = os.path.join(parent, filename) cleanFile(filename, filepath) continue log('Entering', parent) # normal file # log('origin', str(len(filenames))) # unpack file log('origin', str(countFile(parent, 'dump'))) total = 0 for filename in filenames: filepath = os.path.join(parent, filename) if (cleanFile(filename, filepath)): continue log('asming', filename) total = genAsm(filepath, total) log('genasm', str(countFile(parent, 'asm'))) def countFile(dirpath, suffix=''): return len([x for x in os.listdir(dirpath) if (x.split('.')[-1] == suffix)]) def cleanFile(filename, filepath): filetype = filename.split('.')[-1] if (filetype == 'asm' or filetype == 'idb'): os.remove(filepath) print '[-] Clean ', filename return True # unpack file if (filetype == 'bin'): return False return True # return False def getNowTime(): return datetime.datetime.now().strftime('%m-%d-%H:%M:%S') def log(action, content, prefix='[+]', suffix='', subpath=''): logDir = os.path.join(logPath, subpath) if not os.path.exists(logDir): try: os.makedirs(logDir) except: print '[-] Mkdir error' logpath = os.path.join(logDir, logName) with open(logpath, 'a+') as logfile: logfile.write(''.join([prefix, getNowTime(), ' ', action, ' ', content, suffix, '\n'])) if __name__ == '__main__': log('Starting', '', '********', '********') # normal file traveseFile(normalPath, True) traveseFile(normalPath) # unpack file # traveseFile(unpackPath, True) # traveseFile(unpackPath)

上面脚本中的analysis_fullname.idc

#include static main() { // turn on coagulation of data in the final pass of analysis SetShortPrm(INF_AF2, GetShortPrm(INF_AF2) | AF2_DODATA); Wait(); auto file = GetInputFilePath(); auto asmfile = file + ".asm"; auto idbfile = file + ".idb"; WriteTxt(asmfile, 0, BADADDR); // create the assembler file SaveBase(idbfile, 0); // save the idb database Exit(0); // exit to OS, error code 0 - success }

.Asm提取OpCode指令(python脚本) – 过滤注释和垃圾数据

import os # from wingenasm import log # BASEPATH = './/resource//vxheaven//class//virus.win//' BASEPATH = 'E:\\TestVirusAsm\\123' # BASEPATH = './/resource//vxheaven//class//virus.dos//' # PATH = '..//resource//vxheaven//vl//virus.win/' def checkDir(dirpath): if not os.path.exists(dirpath): try: os.makedirs(dirpath) except: print '[-] Mkdir error' def checkFile(filepath): if os.path.exists(filepath): try: os.remove(filepath) except: print '[-] Delete error' def getOpCode(content, filename): opfiledir = os.path.join(BASEPATH, 'opcode') checkDir(opfiledir) opfilepath = os.path.join(opfiledir, filename) checkFile(opfilepath) for line in content: line = line.split(' ') prefix = line[0] if(len(prefix) > 2 and prefix[0:2] == '\t\t'): prefix = prefix.strip() if(prefix == '' or prefix is None): continue if(prefix[0] == '.' or prefix[0] == ';' or prefix[0] == '/'): continue opcode = prefix.split('\t')[0] if(not opcode.isalpha()): continue opcode = ''.join([opcode, '\n']) with open(opfilepath, 'a+') as opfile: opfile.write(opcode) # print prefix.strip() # print line print "getOpcode Success!" def isOpCodeFile(lines): for line in lines: if ('; Format : Binary file' in line): return False return True def getByteCode(parent, filename): rawname = filename[0:-4] # print rawfile desfiledir = os.path.join(BASEPATH, 'bytecode') checkDir(desfiledir) desfilepath = os.path.join(desfiledir, rawname) checkFile(desfilepath) rawpath = os.path.join(parent, rawname) with open(rawpath, 'rb') as rawfile: rawfile.seek(0, 0) while True: byte = rawfile.read(1) if byte == '': break else: hexstr = "%s" % byte.encode('hex') bytecode = ''.join([hexstr, '\n']) with open(desfilepath, 'a+') as bytefile: bytefile.write(bytecode) def checkFileType(filename, type='asm'): return filename.split('.')[-1] == type def traveseFile(path): for parent, dirnames, filenames in os.walk(path): # log('Entering', parent, subpath='opcode') for filename in filenames: if(not checkFileType(filename, 'asm')): continue filepath = os.path.join(parent, filename) print filepath with open(filepath) as asmfile: lines = asmfile.readlines() if(isOpCodeFile(lines)): print 'opcode' # log('OpCoding', filename, subpath='opcode') getOpCode(lines, filename) else: print 'Binary' # log('Bytecoding', filename, subpath='opcode') getByteCode(parent, filename) if __name__ == '__main__': # ('Starting', 'getopcode from benign', '********', '********', subpath='opcode') # viruswin # winnormalpath = os.path.join(BASEPATH, 'normal') # winunpackpath = os.path.join(BASEPATH, 'compress', 'unpack') # # traveseFile(winnormalpath) # traveseFile(winunpackpath) # virusdos # dosnormalpath = os.path.join(BASEPATH, 'normal') # # traveseFile(dosnormalpath) # benign benignpath = os.path.join('E:\\TestVirusAsm') # # traveseFile(winnormalpath) traveseFile(benignpath)

|--> 2. 利用N-gram生成OpCode"特征袋"。 n = 3的效果最佳

import os # from wingenasm import log # BASEPATH = './/resource//vxheaven//class//' BASEPATH = 'E:\\TestVirusAsm\\123\\begin' # BASEPATH = './/resource//vxheaven//class//virus.dos//' # PATH = '..//resource//vxheaven//vl//virus.win/' __GRAM_SIZE__ = 2 __GRAM_TYPE__ = '2-gram' __GRAM_SIZE1__ = 3 __GRAM_TYPE1__ = '3-gram' def checkDir(dirpath): if not os.path.exists(dirpath): try: os.makedirs(dirpath) except: print '[-] Mkdir error' def checkFile(filepath): if os.path.exists(filepath): try: os.remove(filepath) except: print '[-] Delete error' def genGram(content, filename): desfiledir = os.path.join(BASEPATH, __GRAM_TYPE1__) checkDir(desfiledir) desfilepath = os.path.join(desfiledir, filename) checkFile(desfilepath) end = len(content) strgram = '' for i in range(0, end): bigram = content[i: i + __GRAM_SIZE1__] strgram += str(bigram).replace('[', '').replace(']', '').replace('\\n', '').replace('\'', '').replace(' ', '') + '\n' # strgram += str(bigram) + '\n' # print strgram # gramlist = [content[i:i + __GRAM_SIZE1__] for i in range(0, len(content) - 1)] # print gramlist with open(desfilepath, 'w') as desfile: desfile.write(strgram) def traveseFile(path): for parent, dirnames, filenames in os.walk(path): # log('Entering', parent, subpath='opcode') for filename in filenames: filepath = os.path.join(parent, filename) print filepath with open(filepath) as asmfile: lines = asmfile.readlines() # print lines # log('Generating', filename, subpath='opcode') genGram(lines, filename) if __name__ == '__main__': # log('Starting', 'generate 2-gram in benign', '********', '********', subpath='opcode') # viruswin # winopcodepath = os.path.join(BASEPATH, 'virus.win', 'opcode', 'origin') # traveseFile(winopcodepath) # virusdos # dosopcodepath = os.path.join(BASEPATH, 'virus.dos', 'opcode', 'filter') # traveseFile(dosopcodepath) # virusdos benignpath = os.path.join('E:\\TestVirusAsm\\123\\opcode') traveseFile(benignpath)

归一化处理,求tf和df

from __future__ import division import os import copy import math # from wingenasm import log # virus # BASEPATH = './/resource//vxheaven//class//opcode//' # benign # BASEPATH = './/resource//benign//' # classfier BASEPATH = 'E:\\TestVirusAsm\\123\\begin' # BASEPATH = './/resource//vxheaven//class//virus.dos//' # PATH = '..//resource//vxheaven//vl//virus.win/' __GRAM_SIZE__ = 2 __GRAM_TYPE__ = '2-gram-tf' __GRAM_SIZE1__ = 3 __GRAM_TYPE1__ = '3-gram-tf' def checkDir(dirpath): if not os.path.exists(dirpath): try: os.makedirs(dirpath) except: print '[-] Mkdir error' def checkFile(filepath): if os.path.exists(filepath): try: os.remove(filepath) except: print '[-] Delete error' def genSingleTF(content, filename): desfiledir = os.path.join(BASEPATH, __GRAM_TYPE__) checkDir(desfiledir) desfilepath = os.path.join(desfiledir, filename) checkFile(desfilepath) freq = dict() for line in content: # print line freq[line.strip()] = freq.get(line.strip(), 0) + 1 # total = len(content) # desfile = open(desfilepath, 'w') # # for key in freq.keys(): # # print key, freq[key] # desfile.write(key + '----' + str(freq[key]) + '----' + str(total) + '----' + str(freq[key] / total) + '\n') # # desfile.close() maxterm = max(freq.values()) total = len(freq) with open(desfilepath, 'w') as desfile: for key in freq.keys(): # print key, freq[key] desfile.write(key + '----' + str(freq[key]) + '----' + str( total) + '----' + str(freq[key] / maxterm) + '\n') def getTotalTF(content, tf, df): tmp = copy.deepcopy(df) for line in content: # print line tf[line.strip()] = tf.get(line.strip(), 0) + 1 df[line.strip()] = tmp.get(line.strip(), 0) + 1 def traveseFile(path): totaltf = dict() totaldf = dict() totalterm = 0 maxterm = 0 totaldocument = 0 maxdocument = 0 for parent, dirnames, filenames in os.walk(path): # log('Entering', parent, subpath='classfier') totaldocument += len(filenames) for filename in filenames: filepath = os.path.join(parent, filename) print filepath with open(filepath) as asmfile: lines = asmfile.readlines() # log('Generating', filename, subpath='classfier') genSingleTF(lines, filename) # totalterm += len(lines) getTotalTF(lines, totaltf, totaldf) # print totaltf desfilepath = os.path.join(BASEPATH, '2-gram-totaltf') maxterm = max(totaltf.values()) maxdocument = max(totaldf.values()) totalterm = len(totaltf) with open(desfilepath, 'w') as desfile: for key in totaltf.keys(): # print key, totaltf[key] tmp = '----'.join([key, str(totaltf[key]), str(totalterm), str(totaltf[key] / maxterm), str( totaldf.get(key, 0)), str(totaldocument), str(totaldf.get(key, 0) / maxdocument), str(math.log(totaldocument / totaldf.get(key, 1)))]) desfile.write(tmp + '\n') print "Success tf!" if __name__ == '__main__': # log('Starting', 'caulate 2-gram frequncy for classfier','********', '********', subpath='classfier') # 2-gram # grampath = os.path.join(BASEPATH, '2-gram') # traveseFile(grampath) # 2-gram of benign grampath = os.path.join('E:\\TestVirusAsm\\123\\begin', '2-gram') traveseFile(grampath)

|--> 3. 利用Jaccard计算"特征袋"的相似阈值,应用场景如下:

|----> 3.1 已知:同家族样本,如2018年100个样本和2019年100样本直观分析,阈值越高意味着版本迭代越小,阈值越小(组件框架变动越大)。

|----> 3.2 已知:非同家族样本,如100个DarkHotel-100个Ramsay-100个xxx-100个xxx,都是来自于韩国地区APT,可以直观分析APT家族代码重叠情况,用来关联他们组件库联系和代码共用。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:构建数据相似系统(系统相似性)
下一篇:mybatis中注解与xml配置的对应关系和对比分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~