于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。
为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:
同步放羊的过程就是这样的:
羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……
再看看异步放羊的过程:
羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。
很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。
到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。
还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。
async def fetch(session, url, headers=None, timeout=9): _headers = { 'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; ' 'Windows NT 6.1; Win64; x64; Trident/5.0)'), } if headers: _headers = headers try: async with session.get(url, headers=_headers, timeout=timeout) as response: status = response.status html = await response.read() encoding = response.get_encoding() if encoding == 'gb2312': encoding = 'gbk' html = html.decode(encoding, errors='ignore') redirected_url = str(response.url) except Exception as e: msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e)) print(msg) html = '' status = 0 redirected_url = url return status, html, redirected_url
实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。
有了异步下载器,我们的异步爬虫就可以写起来啦~
跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:
通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:
#!/usr/bin/env python3 # File: news-crawler-async.py # Author: veelion import traceback import time import asyncio import aiohttp import urllib.parse as urlparse import farmhash import lzma import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) import sanicdb from urlpool import UrlPool import functions as fn import config class NewsCrawlerAsync: def __init__(self, name): self._workers = 0 self._workers_max = 30 self.logger = fn.init_file_logger(name+ '.log') self.urlpool = UrlPool(name) self.loop = asyncio.get_event_loop() self.session = aiohttp.ClientSession(loop=self.loop) self.db = sanicdb.SanicDB( config.db_host, config.db_db, config.db_user, config.db_password, loop=self.loop ) async def load_hubs(self,): sql = 'select url from crawler_hub' data = await self.db.query(sql) self.hub_hosts = set() hubs = [] for d in data: host = urlparse.urlparse(d['url']).netloc self.hub_hosts.add(host) hubs.append(d['url']) self.urlpool.set_hubs(hubs, 300) async def save_to_db(self, url, html): urlhash = farmhash.hash64(url) sql = 'select url from crawler_html where urlhash=%s' d = await self.db.get(sql, urlhash) if d: if d['url'] != url: msg = 'farmhash collision: %s <=> %s' % (url, d['url']) self.logger.error(msg) return True if isinstance(html, str): html = html.encode('utf8') html_lzma = lzma.compress(html) sql = ('insert into crawler_html(urlhash, url, html_lzma) ' 'values(%s, %s, %s)') good = False try: await self.db.execute(sql, urlhash, url, html_lzma) good = True except Exception as e: if e.args[0] == 1062: # Duplicate entry good = True pass else: traceback.print_exc() raise e return good def filter_good(self, urls): goodlinks = [] for url in urls: host = urlparse.urlparse(url).netloc if host in self.hub_hosts: goodlinks.append(url) return goodlinks async def process(self, url, ishub): status, html, redirected_url = await fn.fetch(self.session, url) self.urlpool.set_status(url, status) if redirected_url != url: self.urlpool.set_status(redirected_url, status) # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取 if status != 200: return if ishub: newlinks = fn.extract_links_re(redirected_url, html) goodlinks = self.filter_good(newlinks) print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks))) self.urlpool.addmany(goodlinks) else: await self.save_to_db(redirected_url, html) self._workers -= 1 async def loop_crawl(self,): await self.load_hubs() last_rating_time = time.time() counter = 0 while 1: tasks = self.urlpool.pop(self._workers_max) if not tasks: print('no url to crawl, sleep') await asyncio.sleep(3) continue for url, ishub in tasks.items(): self._workers += 1 counter += 1 print('crawl:', url) asyncio.ensure_future(self.process(url, ishub)) gap = time.time() - last_rating_time if gap > 5: rate = counter / gap print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers)) last_rating_time = time.time() counter = 0 if self._workers > self._workers_max: print('====== got workers_max, sleep 3 sec to next worker =====') await asyncio.sleep(3) def run(self): try: self.loop.run_until_complete(self.loop_crawl()) except KeyboardInterrupt: print('stopped by yourself!') del self.urlpool pass if __name__ == '__main__': nc = NewsCrawlerAsync('yrx-async') nc.run()
爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。
通过self._workers和self._workers_max来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。
至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。
爬虫知识点
uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:
import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。
uvloop作者的性能测试
这是uvloop作者的性能对比测试。
目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:
if sys.platform in ('win32', 'cygwin', 'cli'): raise RuntimeError('uvloop does not support Windows at the moment') vi = sys.version_info if vi < (3, 5): raise RuntimeError('uvloop requires Python 3.5 or greater')
所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。
思考题
或许有些小猿还没见过这样的html代码,它出现在<head>里面:
<meta http-equiv="refresh" content="5; url=https://example.com/">
它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/。
那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。
这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。
ℹ️Crawlee 是Apify SDK的继承者。用TypeScript完全重写,以获得更好的开发者体验,并具有更强大的抗阻塞功能。界面与 Apify SDK 几乎相同,因此升级轻而易举。阅读升级指南以了解更改。
Crawlee 涵盖了端到端的爬行和抓取,并帮助您构建可靠的抓取工具。快速地。
即使使用默认配置,您的爬虫也会像人类一样在现代机器人保护的雷达下飞行。Crawlee 为您提供了在 Web 上抓取链接、抓取数据并将其存储到磁盘或云中的工具,同时保持可配置以满足您的项目需求。
Crawlee 以crawleeNPM 包的形式提供。
我们建议您访问Crawlee 文档中的介绍教程以获取更多信息。
Crawlee 需要Node.js 16 或更高版本。
试用 Crawlee 的最快方法是使用Crawlee CLI并选择Getting started example。CLI 将安装所有必要的依赖项并添加样板代码供您使用。
npx crawlee create my-crawler
cd my-crawler npm start
如果您更喜欢将 Crawlee 添加到您自己的项目中,请尝试以下示例。因为它使用PlaywrightCrawler我们还需要安装Playwright。它没有与 Crawlee 捆绑以减少安装大小。
npm install crawlee playwright
import { PlaywrightCrawler, Dataset } from 'crawlee';
// PlaywrightCrawler crawls the web using a headless
// browser controlled by the Playwright library.
const crawler = new PlaywrightCrawler({
// Use the requestHandler to process each of the crawled pages.
async requestHandler({ request, page, enqueueLinks, log }) {
const title = await page.title();
log.info(`Title of ${request.loadedUrl} is '${title}'`);
// Save results as JSON to ./storage/datasets/default
await Dataset.pushData({ title, url: request.loadedUrl });
// Extract links from the current page
// and add them to the crawling queue.
await enqueueLinks();
},
// Uncomment this option to see the browser window.
// headless: false,
});
// Add first URL to the queue and start the crawl.
await crawler.run(['https://crawlee.dev']);
默认情况下,Crawlee 将数据存储到./storage当前工作目录中。您可以通过 Crawlee 配置覆盖此目录。详见配置指南、请求存储和结果存储。
网友留言问怎么用python抓取小说,今天小编就给大家分享一下用python抓取起点中文网的免费小说教程,用到的库有urllib2、BeautifulSoup,下面就来看看吧!(关注并私信我python,给你发价值万元的python学习教程。
*请认真填写需求信息,我们会在24小时内与您取得联系。