使用协程快速获得一个代理池

使用协程快速获得一个代理池

前言

在执行 I/O 密集型任务的时候,程序会因为等待 I/O 而阻塞。比如我们使用 requests 库来进行网络爬虫请求的话,如果网站响应速度过慢,程序会一直等待网站响应,最终导致其爬取效率十分低下。本文以爬取 IP 代理池为例,演示 Python 中如何利用异步协程来加速网络爬虫。

协程

协程(Coroutine),又称微线程,纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,在调度回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合。

协程本质上是个单进程,协程相对于多进程来说,无需进程间上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

我们可以使用协程来实现异步操作,比如在网络爬虫场景下,在发出一个请求之后,需要等待一定的时间才能得到响应。其实在这个等待过程中,程序可以干许多其他的事情,等到响应返回之后再切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势。

Python 中的协程

从 Python 3.4 开始,Python 中加入了协程的概念,这个版本的协程是通过生成器对象来实现的,在 Python 3.5 中增加了 asyncio 库和 async、await 关键字,使得协程的实现更加方便。

asyncio 库

首先我们先来看一个不使用协程的程序,代码如下:

import time ​ ​ def job(t): print(Start job {}.format(t)) time.sleep(t)# 等待 t 秒 print(Job {0} takes {0}s.format(t)) ​ ​ def main(): [job(t) for t in range(1, 3)] ​ ​ start = time.time() main() print("total time: {}".format(time.time() - start))

运行结果:

Start job 1 Job 1 takes 1s Start job 2 Job 2 takes 2s total time: 3.0757

从运行结果可以看出,我们的 job 是按顺序执行的。必须执行完 job 1 才能开始执行 job 2, job 1 需要 1 秒的执行时间,job 2 需要 2 秒的执行时间,所以总时间是 3 秒多。

如果我们使用协程的方式,job 1 在等待 time.sleep(t) 执行结束的时候(可以看做是等待一个网页的下载成功),是可以切换到 job 2 执行的。

我们再来看一下使用协程改造后的代码:

import time import asyncio ​ ​ async def job(t):# 使用 async 关键字将一个函数定义为协程 print(Start job {}.format(t)) await asyncio.sleep(t)# 等待 t 秒, 期间切换执行其他任务 print(Job {0} takes {0}s.format(t)) ​ ​ async def main(loop):# 使用 async 关键字将一个函数定义为协程 tasks = [loop.create_task(job(t)) for t in range(1, 3)]# 创建任务, 不立即执行 await asyncio.wait(tasks)# 执行并等待所有任务完成 ​ ​ start = time.time() loop = asyncio.get_event_loop()# 建立 loop loop.run_until_complete(main(loop))# 执行 loop loop.close()# 关闭 loop print("total time: {}".format(time.time() - start))

运行结果:

Start job 1 Start job 2 Job 1 takes 1s Job 2 takes 2s total time: 2.91113

从运行结果可以看出,我们没有等待 job 1 执行结束再开始执行 job 2,而是 job 1 触发 await 的时候切换到了 job 2 。 这时 job 1 和 job 2 同时在执行 await asyncio.sleep(t),所以最终程序的执行时间取决于执行时间最长的那个 job,也就是 job 2 的执行时间:2 秒。

aiohttp 库

在对 asyncio 库做了简单了解之后,我们来看一下如何通过协程来改造我们的爬虫程序。

安装 aiohttp 库:

pip install aiohttp

我们先来看一下使用 reqeusts 库实现一个网页的爬取:

import time ​ import requests ​ def fetch(url): r = requests.get(url) return r.url ​ ​ def main(): results = [fetch() for _ in range(2)] print(results) ​ ​ start = time.time() main() print("total time: {}".format(time.time() - start))

运行结果:

[ ] total time: 1.78857

使用 requests 库,访问两次 ,共耗时 1.5 秒

我们用 aiohttp 库来改造上面的代码:

import time import asyncio ​ import aiohttp ​ ​ async def fetch(session, url): response = await session.get(url)# await 等待网络 IO 并切换协程 return str(response.url) ​ ​ async def main(loop): async with aiohttp.ClientSession() as session: tasks = [ loop.create_task(fetch(session, )) for _ in range(2) ] done, pending = await asyncio.wait(tasks)# 执行并等待所有任务完成 results = [r.result() for r in done]# 获取所有返回结果 print(results) ​ ​ start = time.time() loop = asyncio.get_event_loop()# 建立 事件循环 loop.run_until_complete(main(loop))# 在 事件循环 中执行协程 loop.close()# 关闭 事件循环 print("total time: {}".format(time.time() - start))

运行结果:

[, ] total time: 0.

使用 aiohttp 的代码执行时间较使用 reqeusts 的代码有大幅的提升。

上例中,我们使用官方推荐的方式创建 session,并通过 session 执行 get 操作。aiohttp 官方建议一个 application 中共享使用一个 session,不要为每个请求都创建 session。

使用 asyncio 和 aiohttp 快速获得一个代理池

通过爬虫解析免费的代理发布网站页面,来生成代理池。

import os import re import time import asyncio ​ import aiohttp ​ HEADERS = { User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15 } ​ OUTPUT_FILE = proxies.txt# 代理池输出文件 SITES = [http://www.live-socks.net, ]# 代理发布网站 CHECK_URL = LOCAL_PROXY = None# 在本地发起请求时的代理 ​ ​ # http get 协程 async def fetch(session, url, proxy=None): proxy_headers = HEADERS if proxy else None try: async with session.get( url, headers=HEADERS, proxy=proxy, proxy_headers=proxy_headers, timeout=aiohttp.ClientTimeout(total=5)) as response: if response.status == 200: return await response.text() else: return except: return ​ ​ # 从代理发布网站获取代理发布页面链接 async def get_page_links(loop, session): tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY)) for url in SITES]# 创建协程任务 done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 htmls = [f.result() for f in done]# 获取所有返回结果 ​ # 解析出 html 页面中的代理发布链接 def parse(html): return re.findall(r<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>, html) ​ results = map(parse, htmls)# 逐个解析 html 页面 ​ return [y for x in results for y in x] ​ ​ # 从代理发布页面获取代理 IP async def get_proxies(loop, session, page_links): tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY)) for url in page_links]# 创建协程任务 done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 htmls = [f.result() for f in done]# 获取所有返回结果 ​ # 解析出 html 页面中的代理 IP def parse(html): return re.findall(r\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}, html) ​ results = map(parse, htmls)# 逐个解析 html 页面 ​ return list(set([y for x in results for y in x])) ​ ​ # 验证代理 IP async def check_proxy(session, proxy): html = await fetch(session, CHECK_URL, proxy=proxy) ​ # 如果返回通过代理 IP 访问的页面,则说明代理 IP 有效 return proxy if html else ​ ​ # 通过协程批量验证代理 IP,每次同时发起 200 个验证请求 async def check_proxies(loop, session, proxies): checked_proxies = [] for i in range(0, len(proxies), 200): _proxies = [proxy.strip() if proxy.strip().startswith(http://) else http:// + proxy.strip() for proxy in proxies[i:i + 200]] tasks = [loop.create_task(check_proxy(session, proxy)) for proxy in _proxies] done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 checked = [f.result() for f in done]# 获取所有返回结果 checked_proxies += [p for p in checked if p]# 获取不为空的返回值,即验证成功的代理 IP ​ return checked_proxies ​ ​ # 将代理 IP 逐行保存到文件 def save_proxies(proxies): # 创建新文件,如果文件已存在,则清空文件内容 with open(OUTPUT_FILE, w) as f: f.write() ​ # 通过追加写模式,逐行写入文件 with open(OUTPUT_FILE, a) as f: for proxy in proxies: f.write(proxy + \n) ​ ​ async def main(loop): async with aiohttp.ClientSession() as session: page_links = await get_page_links(loop, session)# 获得代理发布页面链接 # 从代理发布页面获得代理 IP proxies = await get_proxies(loop, session, page_links) print(total proxy: {}.format(len(proxies)))# 解析出的代理 IP 总量 proxies = await check_proxies(loop, session, proxies)# 验证代理 IP ​ print(total checked proxy: {}.format(len(proxies)))# 验证后的代理 IP 总量 save_proxies(proxies)# 保存代理 IP 到文件 ​ ​ start = time.time() loop = asyncio.get_event_loop()# 建立 事件循环 loop.run_until_complete(main(loop))# 在 事件循环 中执行协程 loop.close()# 关闭 事件循环 total_time = time.time() - start print(ftotal time: {total_time})

运行结果:

total proxy: 15675 total checked proxy: 4503 total time: 487.98

更加高效的爬虫

在爬虫程序中,通常有网络请求任务、页面解析任务、数据清洗任务和数据入库任务。

网络请求任务、数据入库任务属于 IO 密集型任务,在 Python 中通常使用多线程模型来提高这类任务的性能,现在还可以通过 aiohttp,Motor(MongoDB 的异步 Python 驱动)等异步框架将性能进一步提升。

页面解析任务、数据清洗任务这类 CPU 密集型的任务我们该如何来提高性能?在 Python 中针对 CPU 密集型任务可以通过 multiprocessing 模块来提升性能,通过 multiprocessing 模块可以使程序运行在多核 CPU 中,增加 CPU 的利用率以提升计算性能。

给代理池爬虫示例增加多核计算支持:

import os import re import time import asyncio from multiprocessing import Pool ​ import aiohttp ​ HEADERS = { User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15 } ​ OUTPUT_FILE = proxies.txt# 代理池输出文件 SITES = [http://www.live-socks.net, ]# 代理发布网站 CHECK_URL = LOCAL_PROXY = :1087# •在本地发起请求时的代理 ​ ​ # http get 协程 async def fetch(session, url, proxy=None): proxy_headers = HEADERS if proxy else None try: async with session.get( url, headers=HEADERS, proxy=proxy, proxy_headers=proxy_headers, timeout=aiohttp.ClientTimeout(total=5)) as response: if response.status == 200: return await response.text() else: return except: return ​ # 解析出 html 页面中的代理发布链接 ​ ​ def parse_page_link(html): return re.findall(r<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>, html) ​ # 从代理发布网站获取代理发布页面链接 ​ ​ async def get_page_links(loop, session): tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY)) for url in SITES]# 创建协程任务 done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 htmls = [f.result() for f in done]# 获取所有返回结果 ​ # 利用多核 CPU 的计算能力提升页面解析性能 with Pool(processes=os.cpu_count() * 2) as pool: results = pool.map(parse_page_link, htmls) ​ return [y for x in results for y in x] ​ # 解析出 html 页面中的代理 IP ​ ​ def parse_proxy(html): return re.findall(r\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}, html) ​ # 从代理发布页面获取代理 IP ​ ​ async def get_proxies(loop, session, page_links): tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY)) for url in page_links]# 创建协程任务 done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 htmls = [f.result() for f in done]# 获取所有返回结果 ​ # 利用多核 CPU 的计算能力提升页面解析性能 with Pool(processes=os.cpu_count() * 2) as pool: results = pool.map(parse_proxy, htmls) ​ return list(set([y for x in results for y in x])) ​ ​ # 验证代理 IP async def check_proxy(session, proxy): html = await fetch(session, CHECK_URL, proxy=proxy) ​ # 如果返回通过代理 IP 访问的页面,则说明代理 IP 有效 return proxy if html else ​ ​ # 通过协程批量验证代理 IP,每次同时发起 200 个验证请求 async def check_proxies(loop, session, proxies): checked_proxies = [] for i in range(0, len(proxies), 200): _proxies = [proxy.strip() if proxy.strip().startswith(http://) else http:// + proxy.strip() for proxy in proxies[i:i + 200]] tasks = [loop.create_task(check_proxy(session, proxy)) for proxy in _proxies] done, _ = await asyncio.wait(tasks)# 执行并等待所有任务完成 checked = [f.result() for f in done]# 获取所有返回结果 checked_proxies += [p for p in checked if p]# 获取不为空的返回值,即验证成功的代理 IP ​ return checked_proxies ​ ​ # 将代理 IP 逐行保存到文件 def save_proxies(proxies): # 创建新文件,如果文件已存在,则清空文件内容 with open(OUTPUT_FILE, w) as f: f.write() ​ # 通过追加写模式,逐行写入文件 with open(OUTPUT_FILE, a) as f: for proxy in proxies: f.write(proxy + \n) ​ ​ async def main(loop): async with aiohttp.ClientSession() as session: page_links = await get_page_links(loop, session)# 获得代理发布页面链接 # 从代理发布页面获得代理 IP proxies = await get_proxies(loop, session, page_links) print(total proxy: {}.format(len(proxies)))# 解析出的代理 IP 总量 proxies = await check_proxies(loop, session, proxies)# 验证代理 IP ​ print(total checked proxy: {}.format(len(proxies)))# 验证后的代理 IP 总量 save_proxies(proxies)# 保存代理 IP 到文件 ​ ​ start = time.time() loop = asyncio.get_event_loop()# 建立 事件循环 loop.run_until_complete(main(loop))# 在 事件循环 中执行协程 loop.close()# 关闭 事件循环 total_time = time.time() - start print(ftotal time: {total_time})

进程间的调度及上下文切换是非常消耗资源的。上面例子中解析任务比较简单,解析量也非常少,增加多核计算支持后,性能几乎没有提升还有可能降低。在实际爬虫项目中需要根据实际情况来衡量和选择。