使用 Python 爬虫爬取网络

网友投稿 284 2022-09-02


使用 Python 爬虫爬取网络

Mechanize库浏览页面

#!/usr/bin/python#coding=utf-8import mechanizedef viewPage(url): browser = mechanize.Browser() page = browser.open(url) source_code = page.read() print source_codeviewPage('mechanizedef testProxy(url, proxy): browser = mechanize.Browser() browser.set_proxies(proxy) page = browser.open(url) source_code = page.read() print source_codeurl = '= {''139.196.202.164:9001'}testProxy(url, hideMeProxy)

#!/usr/bin/python#coding=utf-8import mechanizedef testUserAgent(url, userAgent): browser = mechanize.Browser() browser.addheaders = userAgent page = browser.open(url) source_code = page.read() print source_codeurl = '= [('User-agent', 'Mozilla/5.0 (X11; U; Linux 2.4.2-2 i586; en-US; m18) Gecko/20010131 Netscape6/6.01')]testUserAgent(url, userAgent)

把代码集成在Python类的AnonBrowser中

#!/usr/bin/python#coding=utf-8import mechanizeimport cookielibimport randomclass anonBrowser(mechanize.Browser): def __init__(self, proxies = [], user_agents = []): mechanize.Browser.__init__(self) self.set_handle_robots(False) # 可供用户使用的代理服务器列表 self.proxies = proxies # user_agent列表 self.user_agents = user_agents + ['Mozilla/4.0 ', 'FireFox/6.01','ExactSearch', 'Nokia7110/1.0'] self.cookie_jar = cookielib.LWPCookieJar() self.set_cookiejar(self.cookie_jar) self.anonymize() # 清空cookie def clear_cookies(self): self.cookie_jar = cookielib.LWPCookieJar() self.set_cookiejar(self.cookie_jar) # 从user_agent列表中随机设置一个user_agent def change_user_agent(self): index = random.randrange(0, len(self.user_agents) ) self.addheaders = [('User-agent', ( self.user_agents[index] ))] # 从代理列表中随机设置一个代理 def change_proxy(self): if self.proxies: index = random.randrange(0, len(self.proxies)) self.set_proxies( {'self.proxies[index]} ) # 调用上述三个函数改变UA、代理以及清空cookie以提高匿名性,其中sleep参数可让进程休眠以进一步提高匿名效果 def anonymize(self, sleep = False): self.clear_cookies() self.change_user_agent() self.change_proxy() if sleep: time.sleep(60)

测试每次是否使用不同的cookie访问:

#!/usr/bin/python#coding=utf-8from anonBrowser import *ab = anonBrowser(proxies=[], user_agents=[('User-agent','superSecretBroswer')])for attempt in range(1, 5): # 每次访问都进行一次匿名操作 ab.anonymize() print '[*] Fetching page' response = ab.open(' for cookie in ab.cookie_jar: print cookie

用BeautifulSoup解析Href链接:

#!/usr/bin/python#coding=utf-8from anonBrowser import *from BeautifulSoup import BeautifulSoupimport osimport optparseimport redef printLinks(url): ab = anonBrowser() ab.anonymize() page = ab.open(url) html = page.read() # 使用re模块解析href链接 try: print '[+] Printing Links From Regex.' link_finder = re.compile('href="(.*?)"') links = link_finder.findall(html) for link in links: print link except: pass # 使用bs4模块解析href链接 try: print '\n[+] Printing Links From BeautifulSoup.' soup = BeautifulSoup(html) links = soup.findAll(name='a') for link in links: if link.has_key('href'): print link['href'] except: passdef main(): parser = optparse.OptionParser('[*]Usage: python linkParser.py -u ') parser.add_option('-u', dest='tgtURL', type='string', help='specify target url') (options, args) = parser.parse_args() url = options.tgtURL if url == None: print parser.usage exit(0) else: printLinks(url)if __name__ == '__main__': main()

用BeautifulSoup映射图像

#!/usr/bin/python#coding=utf-8from anonBrowser import *from BeautifulSoup import BeautifulSoupimport osimport optparsedef mirrorImages(url, dir): ab = anonBrowser() ab.anonymize() html = ab.open(url) soup = BeautifulSoup(html) image_tags = soup.findAll('img') for image in image_tags: # lstrip() 方法用于截掉字符串左边的空格或指定字符 filename = image['src'].lstrip(' filename = os.path.join(dir, filename.replace('/', '_')) print '[+] Saving ' + str(filename) data = ab.open(image['src']).read() # 回退 ab.back() save = open(filename, 'wb') save.write(data) save.close()def main(): parser = optparse.OptionParser('[*]Usage: python imageMirror.py -u -d ') parser.add_option('-u', dest='tgtURL', type='string', help='specify target url') parser.add_option('-d', dest='dir', type='string', help='specify destination directory') (options, args) = parser.parse_args() url = options.tgtURL dir = options.dir if url == None or dir == None: print parser.usage exit(0) else: try: mirrorImages(url, dir) except Exception, e: print '[-] Error Mirroring Images.' print '[-] ' + str(e)if __name__ == '__main__': main()

用Python与谷歌API交互

#!/usr/bin/python#coding=utf-8import urllibfrom anonBrowser import *def google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('+ search_term) print response.read()google('Boundock Saint')

接着就对Json格式的数据进行处理,添加json库的load()函数对Json数据进行加载即可

#!/usr/bin/python#coding=utf-8import urllibfrom anonBrowser import *import jsondef google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('+ search_term) objects = json.load(response) print objectsgoogle('Boundock Saint')

编写Google_Result类,用于保存Json数据解析下来的标题

#!/usr/bin/python#coding=utf-8import urllibfrom anonBrowser import *import jsonimport optparseclass Google_Result: def __init__(self,title,text,url): self.title = title self.text = text self.url = url def __repr__(self): return self.titledef google(search_term): ab = anonBrowser() # URL编码 search_term = urllib.quote_plus(search_term) response = ab.open('+ search_term) objects = json.load(response) results = [] for result in objects['items']: url = result['link'] title = result['title'] text = result['snippet'] print url print title print text new_gr = Google_Result(title, text, url) results.append(new_gr) return resultsdef main(): parser = optparse.OptionParser('[*]Usage: python anonGoogle.py -k ') parser.add_option('-k', dest='keyword', type='string', help='specify google keyword') (options, args) = parser.parse_args() keyword = options.keyword if options.keyword == None: print parser.usage exit(0) else: results = google(keyword) print resultsif __name__ == '__main__': main()

用Python解析Tweets个人主页

#!/usr/bin/python#coding=utf-8import jsonimport urllibfrom anonBrowser import *class reconPerson: def __init__(self, first_name, last_name, job='', social_media={}): self.first_name = first_name self.last_name = last_name self.job = job self.social_media = social_media def __repr__(self): return self.first_name + ' ' + self.last_name + ' has job ' + self.job def get_social(self, media_name): if self.social_media.has_key(media_name): return self.social_media[media_name] return None def query_twitter(self, query): query = urllib.quote_plus(query) results = [] browser = anonBrowser() response = browser.open('+ query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] results.append(new_result) return resultsap = reconPerson('Boondock', 'Saint')print ap.query_twitter('from:th3j35t3r since:2010-01-01 include:retweets')

从推文中提取地理位置信息

#!/usr/bin/python#coding=utf-8import jsonimport urllibimport optparsefrom anonBrowser import *def get_tweets(handle): query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweetsdef load_cities(cityFile): cities = [] for line in open(cityFile).readlines(): city=line.strip('\n').strip('\r').lower() cities.append(city) return citiesdef twitter_locate(tweets,cities): locations = [] locCnt = 0 cityCnt = 0 tweetsText = "" for tweet in tweets: if tweet['geo'] != None: locations.append(tweet['geo']) locCnt += 1 tweetsText += tweet['tweet'].lower() for city in cities: if city in tweetsText: locations.append(city) cityCnt+=1 print "[+] Found " + str(locCnt) + " locations via Twitter API and " + str(cityCnt) + " locations from text search." return locationsdef main(): parser = optparse.OptionParser('[*]Usage: python twitterGeo.py -u [-c ]') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') parser.add_option('-c', dest='cityFile', type='string', help='specify file containing cities to search') (options, args) = parser.parse_args() handle = options.handle cityFile = options.cityFile if (handle==None): print parser.usage exit(0) cities = [] if (cityFile!=None): cities = load_cities(cityFile) tweets = get_tweets(handle) locations = twitter_locate(tweets,cities) print "[+] Locations: "+str(locations)if __name__ == '__main__': main()

用正则表达式解析Twitter用户的兴趣爱好

#!/usr/bin/python#coding=utf-8import jsonimport reimport urllibimport urllib2import optparsefrom anonBrowser import *def get_tweets(handle): query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweetsdef find_interests(tweets): interests = {} interests['links'] = [] interests['users'] = [] interests['hashtags'] = [] for tweet in tweets: text = tweet['tweet'] links = re.compile('(').findall(text) for link in links: if link[0]: link = link[0] elif link[1]: link = link[1] else: continue try: response = urllib2.urlopen(link) full_link = response.url interests['links'].append(full_link) except: pass interests['users'] += re.compile('(@\w+)').findall(text) interests['hashtags'] += re.compile('(#\w+)').findall(text) interests['users'].sort() interests['hashtags'].sort() interests['links'].sort() return interestsdef main(): parser = optparse.OptionParser('[*]Usage: python twitterInterests.py -u ') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') (options, args) = parser.parse_args() handle = options.handle if handle == None: print parser.usage exit(0) tweets = get_tweets(handle) interests = find_interests(tweets) print '\n[+] Links.' for link in set(interests['links']): print ' [+] ' + str(link) print '\n[+] Users.' for user in set(interests['users']): print ' [+] ' + str(user) print '\n[+] HashTags.' for hashtag in set(interests['hashtags']): print ' [+] ' + str(hashtag)if __name__ == '__main__': main()

编写reconPerson类,封装所有抓取的地理位置、兴趣爱好以及Twitter页面的代码:

#!/usr/bin/python#coding=utf-8import urllibfrom anonBrowser import *import jsonimport reimport urllib2class reconPerson: def __init__(self, handle): self.handle = handle self.tweets = self.get_tweets() def get_tweets(self): query = urllib.quote_plus('from:' + self.handle + ' since:2009-01-01 include:retweets') tweets = [] browser = anonBrowser() browser.anonymize() response = browser.open('+ query) json_objects = json.load(response) for result in json_objects['results']: new_result = {} new_result['from_user'] = result['from_user_name'] new_result['geo'] = result['geo'] new_result['tweet'] = result['text'] tweets.append(new_result) return tweets def find_interests(self): interests = {} interests['links'] = [] interests['users'] = [] interests['hashtags'] = [] for tweet in self.tweets: text = tweet['tweet'] links = re.compile('(').findall(text) for link in links: if link[0]: link = link[0] elif link[1]: link = link[1] else: continue try: response = urllib2.urlopen(link) full_link = response.url interests['links'].append(full_link) except: pass interests['users'] += re.compile('(@\w+)').findall(text) interests['hashtags'] += re.compile('(#\w+)').findall(text) interests['users'].sort() interests['hashtags'].sort() interests['links'].sort() return interests def twitter_locate(self, cityFile): cities = [] if cityFile != None: for line in open(cityFile).readlines(): city = line.strip('\n').strip('\r').lower() cities.append(city) locations = [] locCnt = 0 cityCnt = 0 tweetsText = '' for tweet in self.tweets: if tweet['geo'] != None: locations.append(tweet['geo']) locCnt += 1 tweetsText += tweet['tweet'].lower() for city in cities: if city in tweetsText: locations.append(city) cityCnt += 1 return locations

使用Smtplib给目标对象发邮件

#!/usr/bin/python#coding=utf-8import smtplibfrom email.mime.text import MIMETextdef sendMail(user, pwd, to, subject, text): msg = MIMEText(text) msg['From'] = user msg['To'] = to msg['Subject'] = subject try: smtpServer = smtplib.SMTP('smtp.gmail.com', 587) print "[+] Connecting To Mail Server." smtpServer.ehlo() print "[+] Starting Encrypted Session." smtpServer.starttls() smtpServer.ehlo() print "[+] Logging Into Mail Server." smtpServer.login(user, pwd) print "[+] Sending Mail." smtpServer.sendmail(user, to, msg.as_string()) smtpServer.close() print "[+] Mail Sent Successfully." except: print "[-] Sending Mail Failed."user = 'username'pwd = 'password'sendMail(user, pwd, 'target@tgt.tgt', 'Re: Important', 'Test Message')

用smtplib进行网络钓鱼

#!/usr/bin/python#coding=utf-8import smtplibimport optparsefrom email.mime.text import MIMETextfrom twitterClass import *from random import choicedef sendMail(user, pwd, to, subject, text): msg = MIMEText(text) msg['From'] = user msg['To'] = to msg['Subject'] = subject try: smtpServer = smtplib.SMTP('smtp.gmail.com', 587) print "[+] Connecting To Mail Server." smtpServer.ehlo() print "[+] Starting Encrypted Session." smtpServer.starttls() smtpServer.ehlo() print "[+] Logging Into Mail Server." smtpServer.login(user, pwd) print "[+] Sending Mail." smtpServer.sendmail(user, to, msg.as_string()) smtpServer.close() print "[+] Mail Sent Successfully." except: print "[-] Sending Mail Failed."def main(): parser = optparse.OptionParser('[*]Usage: python sendSam.py -u -t ' + '-l -p ') parser.add_option('-u', dest='handle', type='string', help='specify twitter handle') parser.add_option('-t', dest='tgt', type='string', help='specify target email') parser.add_option('-l', dest='user', type='string', help='specify gmail login') parser.add_option('-p', dest='pwd', type='string', help='specify gmail password') (options, args) = parser.parse_args() handle = options.handle tgt = options.tgt user = options.user pwd = options.pwd if handle == None or tgt == None or user ==None or pwd==None: print parser.usage exit(0) print "[+] Fetching tweets from: " + str(handle) spamTgt = reconPerson(handle) spamTgt.get_tweets() print "[+] Fetching interests from: " + str(handle) interests = spamTgt.find_interests() print "[+] Fetching location information from: " + str(handle) location = spamTgt.twitter_locate('mlb-cities.txt') spamMsg = "Dear " + tgt + "," if (location != None): randLoc = choice(location) spamMsg += " Its me from " + randLoc + "." if (interests['users'] != None): randUser = choice(interests['users']) spamMsg += " " + randUser + " said to say hello." if (interests['hashtags'] != None): randHash=choice(interests['hashtags']) spamMsg += " Did you see all the fuss about " + randHash + "?" if (interests['links']!=None): randLink=choice(interests['links']) spamMsg += " I really liked your link to: " + randLink + "." spamMsg += " Check out my link to print "[+] Sending Msg: " + spamMsg sendMail(user, pwd, tgt, 'Re: Important', spamMsg)if __name__ == '__main__': main()


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用 Python 分析网络流量(使用筷子就餐会不会传染乙肝病毒)
下一篇:Java11 中基于嵌套关系的访问控制优化问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~