代理池架构浅析

代理池一般是以一个web服务的形式出现的,方便我们管理和调用代理,本身就是一个获取各个网站上代理的工具。当然了,付费的代理池总会更好用一些,自己写一个代理池,其实也是可能有时候我们发现一些能用的免费代理,拿来测试用用还是不错的。

参考崔庆才大神对代理池的要求:

1 多站抓取 异步检测(提高检测效率) 2 定时筛选 持续更新 3 提供接口 易于提取

崔大佬根据别人的架构自己写了一个代理池,地址

Python3WebSpider/ProxyPool​github.com/Python3WebSpider/ProxyPool

本文将介绍代理池源码一些难懂的地方,我尽量以最简单最直接的方式来拆分解释,也推荐大家自己写一个,尽管最后写的没人家好,我也觉得这是值得的,毕竟一个好的代理池是需要后期维护的,如果一味拿别人的来用,后面的代理失效了,自己就很被动了。

我将上面这个代理池的某些抓取逻辑改进了一些,还有修改redis一些在我本机不可用的操作(可能是我的redis版本比较新,建议大家查看redis的官方文档查看相关api)然后做成了自己代理池,地址

wuzhenbin/spider-proxy-pool​github.com/wuzhenbin/spider-proxy-pool

代理池的架构其实还是很好理解的, 获取器+检查器+api输出,获取器帮我们拿到代理ip存到redis,检查器从redis中拿出代理进行检查,对每一个代理进行打分数,少于0分的代理就剔除,api就将代理队列里面随机拿出一个出来供我们使用。来总结下这个代理池的关键功能.

1 定时检测代理池的可用性 循环往复 2 代理池的数量达到一个阈值的话就会停止爬取 3 代理池每一个固定周期就会主动进行爬取一次 4 提供一个接口 调用的时候随机返回一个好用的代理ip

首先来完成获取器部分,为了方便拓展代理获取网站,一般的思路是做一个元类来进行调用,简单的模型如下

class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs[__CrawlName__] = [] # 遍历类的所有属性筛选出crawl_开头的函数 for k, v in attrs.items(): if crawl_ in k: attrs[__CrawlName__].append(k) return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_raw_proxies(self, site_name): return [item for item in eval("self.{}()".format(site_name))] def crawl_xila(self): pass def crawl_66(self,page_count=4): pass def crawl_ip3366(self): pass if __name__ == __main__: crawler = ProxyGetter() for site_name in self.crawler.__CrawlName__: proxies = self.crawler.get_raw_proxies(site_name)

关于元类的用法这里不多做解释,在这里简单说下使用场景,因为我们后期需要拓展更多的爬取代理的网站,这里已经有我定义的 这3个网站的代理,爬取逻辑完成之后如果要一个个去调用就很蛋疼了,借助元类,可以帮我们把ProxyGetter类存在的子类进行调用,这里对函数名要求的就是要以crawl_开头。

代理池的数据存储主要用到的是redis的sorted-set,即是有序集合,Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。有序集合的成员是唯一的,但分数(score)却可以重复。在这里,分数可以让我们对每一个代理进行分类,我们初始化每一个获取到的代理给他10分,通过检查器检查后能用的话就将该代理改成100分,不能用就减一分,即是9分,等到下次检查继续作这样的判断,一旦低于0分,我们就将该代理从代理池中删除,检查器是循环执行的,执行周期设置成20秒,获取器因为是固定站点获取,所以周期设置成300秒以上,跟代理网站的更新速度差不多就行了。那检查器如何进行的呢?

import asyncio import aiohttp import time import sys try: from aiohttp import ClientError except: from aiohttp import ClientProxyConnectionError as ProxyConnectionError from app.db import RedisClient from app.setting import * class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self, proxy): """ 测试单个代理 :param proxy: :return: """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: # 将bytes对象解码成字符串,默认使用utf-8进行解码。防止数据库提取的proxy是bytes格式。 if isinstance(proxy, bytes): proxy = proxy.decode(utf-8) real_proxy = http:// + proxy print(正在测试, proxy) async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print(代理可用, proxy) else: self.redis.decrease(proxy) print(请求响应码不合法 , response.status, IP, proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print(代理请求失败, proxy) def run(self): """ 测试主函数 :return: """ print(测试器开始运行) try: count = self.redis.count() print(当前剩余, count, 个代理) for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print(正在测试第, start + 1, -, stop, 个代理) test_proxies = self.redis.batch(start, stop) loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print(测试器发生错误, e.args)

检查器是通过异步协程进行检查的,并发量设置为10,也就是说,在redis数据库中拿出所有的代理数据,每10个1组进行异步请求目标站点,一般拿百度做测试就行了,异步协程主要以aiohttp跟asyncio配合,把异步代码做成任务传进时间循环里执行即可。

api的提供使用的flask这个框架,关于flask的东西就多了,不过在这个项目我们只需要几个简单的接口即可满足我们的需求

from flask import Flask, g from .db import RedisClient __all__ = [app] app = Flask(__name__) def get_conn(): if not hasattr(g, redis): g.redis = RedisClient() return g.redis @app.route(/) def index(): return <h2>Welcome to Proxy Pool System</h2> @app.route(/random) def get_proxy(): """ Get a proxy :return: 随机代理 """ conn = get_conn() return conn.random() @app.route(/count) def get_counts(): """ Get the count of proxies :return: 代理池总量 """ conn = get_conn() return str(conn.count()) if __name__ == __main__: app.run()

在flask中,有一个专门用来存储用户信息的g对象,g的全称的为global,我们利用这个g对象来保存redis对象。上面主要做了一个 获取随机一个代理和一个代理池数量统计的接口,其实主要都是redis的数据库操作。

数据库的操作一般统一在一个文件中进行

import redis from app.error import PoolEmptyError from app.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY from app.setting import MAX_SCORE, MIN_SCORE, INITIAL_SCORE from random import choice import re class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): """ 初始化 :param host: Redis 地址 :param port: Redis 端口 :param password: Redis密码 """ self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) def add(self, proxy, score=INITIAL_SCORE): """ 添加代理,设置分数为最高 :param proxy: 代理 :param score: 分数 :return: 添加结果 """ if not re.match(\d+\.\d+\.\d+\.\d+\:\d+, proxy): print(代理不符合规范, proxy, 丢弃) return # 获取该代理在代理序列中的分数值 获取不到即是不存在 if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY,{proxy: score}) def random(self): """ 随机获取有效代理,首先尝试获取最高分数代理,如果不存在,在前100个取,否则异常 :return: 随机代理 """ result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: raise PoolEmptyError def decrease(self, proxy): """ 代理值减一分,小于最小值则删除 :param proxy: 代理 :return: 修改后的代理分数 """ score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print(代理, proxy, 当前分数, score, 减1) return self.db.zincrby(REDIS_KEY, -1, proxy) else: print(代理, proxy, 当前分数, score, 移除) return self.db.zrem(REDIS_KEY, proxy) def max(self, proxy): """ 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 """ print(代理, proxy, 可用,设置为, MAX_SCORE) return self.db.zadd(REDIS_KEY,{proxy: MAX_SCORE}) def count(self): return self.db.zcard(REDIS_KEY) def batch(self, start, stop): """ 批量获取 :param start: 开始索引 :param stop: 结束索引 :return: 代理列表 """ return self.db.zrevrange(REDIS_KEY, start, stop - 1) if __name__ == __main__: conn = RedisClient()

本项目主要用到redis数据类型是有序序列sorted set。

以上就是获取器 检查器 以及api形成的一个代理池架构,获取器跟检查器是通过调度器循环进行的。最后我们可以写个测试代码测试下接口

import requests from requests.exceptions import RequestException base_headers = { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36, Accept-Encoding: gzip, deflate, sdch, Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7 } def get_response(url,params={},options={}): headers = dict(base_headers, **options) try: response = requests.get(url,params=params,headers=headers) if response.status_code == 200: return response.text return None except RequestException as e: print(err: %s,%s % e,url) def main(): url = :5555/random html = get_response(url) print(html) if __name__ == __main__: main()

这里主要就是为了获取代理这个接口,我们在其他程序使用代理也是通过调用这个接口实现的。以上的逻辑在理解之后就可以自行修改一些参数来满足自己的需求了,后期只需要维护代理的爬取逻辑,尽量多找些不同的网站,减少某些代理网站的压力。