Hello 大家好!我又来了。
你是不是发现下载图片速度特别慢、难以忍受啊!对于这种问题 一般解决办法就是多进程了!一个进程速度慢!我就用十个进程,相当于十个人一起干。速度就会快很多啦!(为什么不说多线程?懂点Python的小伙伴都知道、GIL的存在 导致Python的多线程有点坑啊!)今天就教大家来做一个多进程的爬虫(其实吧、可以用来做一个超简化版的分布式爬虫)
其实吧!还有一种加速的方法叫做“异步”!不过这玩意儿我没怎么整明白就不出来误人子弟了!(因为爬虫大部分时间都是在等待response中!‘异步’则能让程序在等待response的时间去做的其他事情。)
学过Python基础的同学都知道、在多进程中,进程之间是不能相互通信的,这就有一个很坑爹的问题的出现了!多个进程怎么知道那那些需要爬取、哪些已经被爬取了!
这就涉及到一个东西!这玩意儿叫做队列!!队列!!队列!!其实吧正常来说应该给大家用队列来完成这个教程的, 比如 Tornado 的queue模块。(如果需要更为稳定健壮的队列,则请考虑使用Celery这一类的专用消息传递工具)
不过为了简化技术种类啊!(才不会告诉你们是我懒,嫌麻烦呢!)这次我们继续使用MongoDB。
好了!先来理一下思路:
每个进程需要知道那些URL爬取过了、哪些URL需要爬取!我们来给每个URL设置两种状态:
outstanding:等待爬取的URL
complete:爬取完成的URL
诶!等等我们好像忘了啥? 失败的URL的怎么办啊?我们在增加一种状态:
processing:正在进行的URL。
嗯!当一个所有初始的URL状态都为outstanding;当开始爬取的时候状态改为:processing;爬取完成状态改为:complete;失败的URL重置状态为:outstanding。为了能够处理URL进程被终止的情况、我们设置一个计时参数,当超过这个值时;我们则将状态重置为outstanding。
下面开整Go Go Go!
首先我们需要一个模块:datetime(这个模块比内置time模块要好使一点)不会装??不是吧! pip install datetime
还有上一篇博文我们已经使用过的pymongo
下面是队列的代码:
from datetime import datetime, timedelta from pymongo import MongoClient, errors class MogoQueue(): OUTSTANDING = 1 ##初始状态 PROCESSING = 2 ##正在下载状态 COMPLETE = 3 ##下载完成状态 def __init__(self, db, collection, timeout=300):##初始mongodb连接 self.client = MongoClient() self.Client = self.client[db] self.db = self.Client[collection] self.timeout = timeout def __bool__(self): """ 这个函数,我的理解是如果下面的表达为真,则整个类为真 至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手) $ne的意思是不匹配 """ record = self.db.find_one( {'status': {'$ne': self.COMPLETE}} ) return True if record else False def push(self, url, title): ##这个函数用来添加新的URL进队列 try: self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主题': title}) print(url, '插入队列成功') except errors.DuplicateKeyError as e: ##报错则代表已经存在于队列之中了 print(url, '已经存在于队列中了') pass def push_imgurl(self, title, url): try: self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url}) print('图片地址插入成功') except errors.DuplicateKeyError as e: print('地址已经存在了') pass def pop(self): """ 这个函数会查询队列中的所有状态为OUTSTANDING的值, 更改状态,(query后面是查询)(update后面是更新) 并返回_id(就是我们的URL),MongDB好使吧,^_^ 如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING, $set是设置的意思,和MySQL的set语法一个意思 """ record = self.db.find_and_modify( query={'status': self.OUTSTANDING}, update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}} ) if record: return record['_id'] else: self.repair() raise KeyError def pop_title(self, url): record = self.db.find_one({'_id': url}) return record['主题'] def peek(self): """这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)""" record = self.db.find_one({'status': self.OUTSTANDING}) if record: return record['_id'] def complete(self, url): """这个函数是更新已完成的URL完成""" self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}}) def repair(self): """这个函数是重置状态$lt是比较""" record = self.db.find_and_modify( query={ 'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)}, 'status': {'$ne': self.COMPLETE} }, update={'$set': {'status': self.OUTSTANDING}} ) if record: print('重置URL状态', record['_id']) def clear(self): """这个函数只有第一次才调用、后续不要调用、因为这是删库啊!""" self.db.drop()
好了,队列我们做好了,下面是获取所有页面的代码:
from Download import request from mongodb_queue import MogoQueue from bs4 import BeautifulSoup spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') def start(url): response = request.get(url, 3) Soup = BeautifulSoup(response.text, 'lxml') all_a = Soup.find('div', class_='all').find_all('a') for a in all_a: title = a.get_text() url = a['href'] spider_queue.push(url, title) """上面这个调用就是把URL写入MongoDB的队列了""" if __name__ == "__main__": start('http://www.mzitu.com/all') """这一段儿就不解释了哦!超级简单的"""
下面就是多进程+多线程的下载代码了:
import os import time import threading import multiprocessing from mongodb_queue import MogoQueue from Download import request from bs4 import BeautifulSoup SLEEP_TIME = 1 def mzitu_crawler(max_threads=10): crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##这个是我们获取URL的队列 ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue') def pageurl_crawler(): while True: try: url = crawl_queue.pop() print(url) except KeyError: print('队列没有数据') break else: img_urls = [] req = request.get(url, 3).text title = crawl_queue.pop_title(url) mkdir(title) os.chdir('D:\mzitu\\' + title) max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text() for page in range(1, int(max_span) + 1): page_url = url + '/' + str(page) img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src'] img_urls.append(img_url) save(img_url) crawl_queue.complete(url) ##设置为完成状态 ##img_queue.push_imgurl(title, img_urls) ##print('插入数据库成功') def save(img_url): name = img_url[-9:-4] print(u'开始保存:', img_url) img = request.get(img_url, 3) f = open(name + '.jpg', 'ab') f.write(img.content) f.close() def mkdir(path): path = path.strip() isExists = os.path.exists(os.path.join("D:\mzitu", path)) if not isExists: print(u'建了一个名字叫做', path, u'的文件夹!') os.makedirs(os.path.join("D:\mzitu", path)) return True else: print(u'名字叫做', path, u'的文件夹已经存在了!') return False threads = [] while threads or crawl_queue: """ 这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据 threads 或者 crawl_queue为真都代表我们还没下载完成,程序就会继续执行 """ for thread in threads: if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉 threads.remove(thread) while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时 thread = threading.Thread(target=pageurl_crawler) ##创建线程 thread.setDaemon(True) ##设置守护线程 thread.start() ##启动线程 threads.append(thread) ##添加进线程队列 time.sleep(SLEEP_TIME) def process_crawler(): process = [] num_cpus = multiprocessing.cpu_count() print('将会启动进程数为:', num_cpus) for i in range(num_cpus): p = multiprocessing.Process(target=mzitu_crawler) ##创建进程 p.start() ##启动进程 process.append(p) ##添加进进程队列 for p in process: p.join() ##等待进程队列里面的进程结束 if __name__ == "__main__": process_crawler()
好啦!一个多进程多线的爬虫就完成了,(其实你可以设置一下MongoDB,然后调整一下连接配置,在多台机器上跑哦!!嗯,就是超级简化版的分布式爬虫了,虽然很是简陋。)
本来还想下载图片那一块儿加上异步(毕竟下载图片是I\O等待最久的时间了,),可惜异步我也没怎么整明白,就不拿出来贻笑大方了。
另外,各位小哥儿可以参考上面代码,单独处理图片地址试试(就是多个进程直接下载图片)?
我测试了一下八分钟下载100套图
PS:请务必使用 第二篇博文中的下载模块,或者自己写一个自动更换代理的下载模块!!!不然寸步难行,分分钟被服务器BAN掉!
小白教程就到此结束了,后面我教大家玩玩Scrapy;目标 顶点小说网, 爬完全站的小说。
再后面带大家玩玩 抓新浪 汤不热、模拟登录 之类的。或许维护一个公共代理IP池之类的。
这个所有代码我放在这个位置了:https://github.com/thsheep/mzitu/
转载请注明:静觅 » 小白爬虫第四弹之爬虫快跑(多进程+多线程)