爬虫笔记:使用代理池和线程池提高数据采集效率

前言

爬虫和反爬虫是一对矛和盾,反爬虫很常见的一个方法就是封IP,一个IP短时间内频繁访问,可以做限流或者是加入黑名单,我之前的后台开发相关博客也有涉及这一块。

不过今天说的是爬虫,所以应对的方法就是用代理池,每次请求都用不同的IP就行,再加上UA模拟,完全是正常用户的行为,可以避开限流和黑名单反爬。

然后爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~

代理池

一个完善的代理池,应该可以实现以下功能

批量采集代理(或者通过接口导入我们购买的代理,不过偶尔用一用还是免费的就好)采集到之后自动验证代理有效性将有效代理存储起来提供获取随机代理的接口提供管理(删除、增加)代理的接口

自己造轮子太麻烦了,用Python的初衷不就是”人生苦短,我用Python“吗,并且社区也没让我们失望,开源好用的Python代理池项目有很多,这里我选了一个在GitHub上有14k+ Stars的项目来用,名字叫ProxyPool。

经过试用还不错!

当然还有其他很多线程池项目,我没测试,有兴趣的同学可以看看参考资料的第一个链接。

部署运行

项目地址:https://github.com/jhao104/proxy_pool

官方文档提供了两种部署方式,包括下载代码运行和docker,既然有docker那肯定选最方便的docker啦!

不过官方的docker命令还不够方便,因为这个代理池还需要依赖Redis服务,这里我写了一个docker-compose配置来用:

version: "3" services: redis: image: redis expose: - 6379 web: restart: always image: jhao104/proxy_pool environment: - DB_CONN=redis://redis:6379/0 ports: - "5010:5010" depends_on: - redis

找个文件夹保存一下,然后执行命令启动docker容器

docker-compose up

这里我配置的端口是5010跟官网一样,有需要的同学可以自己修改~

项目启动起来之后,浏览器访问:5010,可以得到所有接口,各个接口顾名思义很容易理解。

{ "url": [ { "desc": "get a proxy", "params": "type: https|", "url": "/get" }, { "desc": "get and delete a proxy", "params": "", "url": "/pop" }, { "desc": "delete an unable proxy", "params": "proxy: e.g. 127.0.0.1:8080", "url": "/delete" }, { "desc": "get all proxy from proxy pool", "params": "type: https|", "url": "/all" }, { "desc": "return proxy count", "params": "", "url": "/count" } ] }

代码中使用

由于这个代理池提供了HTTP接口,理论上可以支持任何语言使用

这里我用Python来写

获取随机代理

我写了两个方法,封装了获取随机代理和删除代理的操作

import requests PROXY_POOL_URL = :5010 def get_proxy(): proxy = requests.get(f"{PROXY_POOL_URL}/get/").json().get("proxy") return {http: proxy, https: proxy} def delete_proxy(proxy): requests.get(f"{PROXY_POOL_URL}/delete/?proxy={proxy}")

获取随机Header

使用fake_useragent这个库来生成随机的UserAgent,模拟不同的用户浏览器请求

from fake_useragent import UserAgent def get_header(): return { "User-Agent": ua.random }

网络请求封装

因为我们没有买收费代理,所以使用的是代理池自动采集的免费代理,众所周知免费代理的质量不好保证,所以我写了重试功能,失败次数超过最大重试次数之后就删除这个代理,换个代理重新来~

最大重试次数可以配置MAX_RETRY_COUNT变量

MAX_RETRY_COUNT = 5 def request_get(url) -> Tuple[Response, str]: retry_count = 1 proxy = get_proxy() while retry_count <= MAX_RETRY_COUNT: logger.debug(f第{retry_count}次请求 - 网址 {url} - 代理 {proxy.get("http")}) try: resp = requests.get(url, proxies=proxy, headers=get_header(), timeout=15) return resp, proxy.get(http) except Exception: logger.error(f请求失败 - 网址 {url}) retry_count += 1 # 删除代理池中代理 logger.warning(f全部{MAX_RETRY_COUNT}次请求都失败 - 删除代理 {proxy.get("http")}) delete_proxy(proxy.get(http)) return request_get(url)

这个函数返回的是一个(Response, str)类型的元组,考虑到不同请求拿到的数据格式可能不一样,所以没有用resp.json()或者resp.text形式,可以调用这个函数拿到数据后自行处理。

同时还会返回一个str类型的代理服务器地址,是ip:port形式。

调用方法就是这种形式:resp, proxy = request_get(url)

因为我封装的这个request_get函数只是最基础的获取数据,但拿到的数据不一定是正确可用的,比如触发了限流或者黑名单,拿到的数据就是空的,这时候在调用这个函数拿到数据后可以加一次判断,假如这个代理IP已经被封禁了,可以调用delete_proxy方法删除该代理。

线程池

爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~

线程池是一组预先实例化的空闲线程,准备好接受工作。为每个要异步执行的任务创建一个新的线程对象是很昂贵的。使用线程池,你可以将任务添加到任务队列,线程池为任务分配一个可用线程。线程池有助于避免创建或销毁不必要的线程。

之前我用过threadpool这个pip包实现线程池,感觉还不错,但是拿来爬虫有几率出现不明原因的假死,不知道哪里出问题了,后面看网上资料说这个threadpool更适合CPU密集形的操作…

PS:我看了threadpool的源码实现,牛哇421行代码就实现了线程池的功能~ 然后他是基于threading模块实现的,可以的

这次我改用Python标准库自带的线程池实现,事实上,Python里有两种“池”

multiprocessing.Poolmultiprocessing.pool.Threadpool

这两种的异同:

multiprocessing.pool.ThreadPool 的行为方式与 multiprocessing.Pool 相同。不同之处在于 multiprocessing.pool.Threadpool 使用线程来运行 worker 的逻辑,而 multiprocessing.Pool 使用工作进程。

但这俩我暂时也不用,因为有更好的选择。

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:

主线程可以获取某一个线程(或者任务的)的状态,以及返回值。当一个线程完成的时候,主线程能够立即知道。让多线程和多进程的编码接口一致。

所以来看看代码吧

代码

简单用法

def crawl_data(page): ... from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED pool = ThreadPoolExecutor(8) logger.info(线程池启动) tasks = [pool.submit(crawl_data, page) for page in range(1, 100)] wait(tasks, return_when=ALL_COMPLETED) logger.info(线程池结束)

上面代码解析:

crawl_data函数是爬虫函数,具体代码省略ThreadPoolExecutor(8)表示创建线程池,同时8个线程并行然后用列表生成器,pool.submit方法用来把任务添加到线程池wait函数用来等待线程池执行结束。

除了pool.submit方法之外,还支持map方法批量添加任务

使用方法如下:

pool = ThreadPoolExecutor(8) pool.map(crawl_data, range(1,100))

map方法的第二个参数是要传给任务的参数列表,所以就是列表里有多少个参数,就创建多少个任务~

经过测试非常稳,哈哈哈,还是标准库的东西好用~

参考资料

https://suyin-blog.club/2021/2G4HXBY/#proxy-pool-%E6%8E%A8%E8%8D%90python threadpool 的前世今生:https://zhangchenchen.github.io/2017/05/18/python-thread-pool/https://www.delftstack.com/zh/howto/python/python-threadpool-differences/[python] ThreadPoolExecutor线程池:https://www.jianshu.com/p/b9b3d66aa0be