整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

python:大规模异步爬虫用asyncio实现异步爬虫

于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。

为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:

  • 下载请求开始,就是放羊出去吃草;
  • 下载任务完成,就是羊吃饱回羊圈。

同步放羊的过程就是这样的:

羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……

再看看异步放羊的过程:

羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。

很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。

到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。

1. 异步的downloader

还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。

async def fetch(session, url, headers=None, timeout=9):
 _headers = {
 'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
 'Windows NT 6.1; Win64; x64; Trident/5.0)'),
 }
 if headers:
 _headers = headers
 try:
 async with session.get(url, headers=_headers, timeout=timeout) as response:
 status = response.status
 html = await response.read()
 encoding = response.get_encoding()
 if encoding == 'gb2312':
 encoding = 'gbk'
 html = html.decode(encoding, errors='ignore')
 redirected_url = str(response.url)
 except Exception as e:
 msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
 print(msg)
 html = ''
 status = 0
 redirected_url = url
 return status, html, redirected_url
  • 这个异步的downloader,我们称之为fetch(),它有两个必须参数:
  • seesion: 这是一个aiohttp.ClientSession的对象,这个对象的初始化在crawler里面完成,每次调用fetch()时,作为参数传递。

url:这是需要下载的网址。

实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。

有了异步下载器,我们的异步爬虫就可以写起来啦~

2. 异步新闻爬虫

跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:

  • self.urlpool 网址池
  • self.loop 异步的事件循环
  • self.seesion aiohttp.ClientSession的对象,用于异步下载
  • self.db 基于aiomysql的异步数据库连接
  • self._workers 当前并发下载(放出去的羊)的数量

通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:

#!/usr/bin/env python3
# File: news-crawler-async.py
# Author: veelion
import traceback
import time
import asyncio
import aiohttp
import urllib.parse as urlparse
import farmhash
import lzma
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import sanicdb
from urlpool import UrlPool
import functions as fn
import config
class NewsCrawlerAsync:
 def __init__(self, name):
 self._workers = 0
 self._workers_max = 30
 self.logger = fn.init_file_logger(name+ '.log')
 self.urlpool = UrlPool(name)
 self.loop = asyncio.get_event_loop()
 self.session = aiohttp.ClientSession(loop=self.loop)
 self.db = sanicdb.SanicDB(
 config.db_host,
 config.db_db,
 config.db_user,
 config.db_password,
 loop=self.loop
 )
 async def load_hubs(self,):
 sql = 'select url from crawler_hub'
 data = await self.db.query(sql)
 self.hub_hosts = set()
 hubs = []
 for d in data:
 host = urlparse.urlparse(d['url']).netloc
 self.hub_hosts.add(host)
 hubs.append(d['url'])
 self.urlpool.set_hubs(hubs, 300)
 async def save_to_db(self, url, html):
 urlhash = farmhash.hash64(url)
 sql = 'select url from crawler_html where urlhash=%s'
 d = await self.db.get(sql, urlhash)
 if d:
 if d['url'] != url:
 msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
 self.logger.error(msg)
 return True
 if isinstance(html, str):
 html = html.encode('utf8')
 html_lzma = lzma.compress(html)
 sql = ('insert into crawler_html(urlhash, url, html_lzma) '
 'values(%s, %s, %s)')
 good = False
 try:
 await self.db.execute(sql, urlhash, url, html_lzma)
 good = True
 except Exception as e:
 if e.args[0] == 1062:
 # Duplicate entry
 good = True
 pass
 else:
 traceback.print_exc()
 raise e
 return good
 def filter_good(self, urls):
 goodlinks = []
 for url in urls:
 host = urlparse.urlparse(url).netloc
 if host in self.hub_hosts:
 goodlinks.append(url)
 return goodlinks
 async def process(self, url, ishub):
 status, html, redirected_url = await fn.fetch(self.session, url)
 self.urlpool.set_status(url, status)
 if redirected_url != url:
 self.urlpool.set_status(redirected_url, status)
 # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
 if status != 200:
 return
 if ishub:
 newlinks = fn.extract_links_re(redirected_url, html)
 goodlinks = self.filter_good(newlinks)
 print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
 self.urlpool.addmany(goodlinks)
 else:
 await self.save_to_db(redirected_url, html)
 self._workers -= 1
 async def loop_crawl(self,):
 await self.load_hubs()
 last_rating_time = time.time()
 counter = 0
 while 1:
 tasks = self.urlpool.pop(self._workers_max)
 if not tasks:
 print('no url to crawl, sleep')
 await asyncio.sleep(3)
 continue
 for url, ishub in tasks.items():
 self._workers += 1
 counter += 1
 print('crawl:', url)
 asyncio.ensure_future(self.process(url, ishub))
 gap = time.time() - last_rating_time
 if gap > 5:
 rate = counter / gap
 print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
 last_rating_time = time.time()
 counter = 0
 if self._workers > self._workers_max:
 print('====== got workers_max, sleep 3 sec to next worker =====')
 await asyncio.sleep(3)
 def run(self):
 try:
 self.loop.run_until_complete(self.loop_crawl())
 except KeyboardInterrupt:
 print('stopped by yourself!')
 del self.urlpool
 pass
if __name__ == '__main__':
 nc = NewsCrawlerAsync('yrx-async')
 nc.run()

爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。

通过self._workers和self._workers_max来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。

至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。

爬虫知识点

1. uvloop模块

uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。

uvloop作者的性能测试

这是uvloop作者的性能对比测试。

目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:

if sys.platform in ('win32', 'cygwin', 'cli'):
 raise RuntimeError('uvloop does not support Windows at the moment')
vi = sys.version_info
if vi < (3, 5):
 raise RuntimeError('uvloop requires Python 3.5 or greater')

所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。

思考题

1. 给同步的downloader()或异步的fetch()添加功能

或许有些小猿还没见过这样的html代码,它出现在<head>里面:

<meta http-equiv="refresh" content="5; url=https://example.com/">

它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/。

那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。

2. 如何控制hub的刷新频率,及时发现最新新闻

这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。

ℹ️Crawlee 是Apify SDK的继承者。用TypeScript完全重写,以获得更好的开发者体验,并具有更强大的抗阻塞功能。界面与 Apify SDK 几乎相同,因此升级轻而易举。阅读升级指南以了解更改。

Crawlee 涵盖了端到端的爬行和抓取,并帮助您构建可靠的抓取工具。快速地。

即使使用默认配置,您的爬虫也会像人类一样在现代机器人保护的雷达下飞行。Crawlee 为您提供了在 Web 上抓取链接、抓取数据并将其存储到磁盘或云中的工具,同时保持可配置以满足您的项目需求。

Crawlee 以crawleeNPM 包的形式提供。

安装

我们建议您访问Crawlee 文档中的介绍教程以获取更多信息。

Crawlee 需要Node.js 16 或更高版本

使用 Crawlee CLI

试用 Crawlee 的最快方法是使用Crawlee CLI并选择Getting started example。CLI 将安装所有必要的依赖项并添加样板代码供您使用。

npx crawlee create my-crawler
cd my-crawler npm start

手动安装

如果您更喜欢将 Crawlee 添加到您自己的项目中,请尝试以下示例。因为它使用PlaywrightCrawler我们还需要安装Playwright。它没有与 Crawlee 捆绑以减少安装大小。

npm install crawlee playwright
import { PlaywrightCrawler, Dataset } from 'crawlee';

// PlaywrightCrawler crawls the web using a headless
// browser controlled by the Playwright library.
const crawler = new PlaywrightCrawler({
    // Use the requestHandler to process each of the crawled pages.
    async requestHandler({ request, page, enqueueLinks, log }) {
        const title = await page.title();
        log.info(`Title of ${request.loadedUrl} is '${title}'`);

        // Save results as JSON to ./storage/datasets/default
        await Dataset.pushData({ title, url: request.loadedUrl });

        // Extract links from the current page
        // and add them to the crawling queue.
        await enqueueLinks();
    },
    // Uncomment this option to see the browser window.
    // headless: false,
});

// Add first URL to the queue and start the crawl.
await crawler.run(['https://crawlee.dev']);

默认情况下,Crawlee 将数据存储到./storage当前工作目录中。您可以通过 Crawlee 配置覆盖此目录。详见配置指南请求存储结果存储

特征

  • 用于HTTP 和无头浏览器抓取的单一界面
  • 用于抓取 URL 的持久队列(广度和深度优先)
  • 表格数据和文件的可插拔存储
  • 使用可用系统资源自动扩展
  • 集成代理轮换和会话管理
  • 使用钩子可定制的生命周期
  • CLI引导您的项目
  • 可配置的路由错误处理重试
  • 准备部署的Dockerfile
  • 用TypeScript用泛型编写

HTTP 爬取

  • 零配置HTTP2 支持,即使是代理
  • 自动生成类似浏览器的标题
  • 复制浏览器TLS 指纹
  • 集成快速HTML 解析器。Cheerio 和 JSDOM
  • 是的,您也可以抓取JSON API

真正的浏览器爬取

  • JavaScript渲染截图
  • 无头有头支持
  • 零配置生成类人指纹
  • 自动浏览器管理
  • 使用相同界面的PlaywrightPuppeteer
  • Chrome , Firefox , Webkit等等

网友留言问怎么用python抓取小说,今天小编就给大家分享一下用python抓取起点中文网的免费小说教程,用到的库有urllib2、BeautifulSoup,下面就来看看吧!(关注并私信我python,给你发价值万元的python学习教程。