作者:掌控安全-暗箭
在之前就分享了一个【代理池工具撰写】——搭建简单代理池
目录
刚开始写的第一代代理池 单纯为了练手,只具备批量爬取代理的功能。
后来我开始写第二代,批量爬取加检测代理否可用,将可用代理输出,不可用代理pass。这是根据我自身需求写的,短短几十行代码清晰可辨。
很明显,这有很多不足。如果请求的代理原本是一个可用代理,但因为首次请求出现意外,导致请求失败,那么一个明明可以为我们所用的代理,就这样被我们错过了。
不具备轮询变动的功能,不具备配合工具来回跳动ip的功能……
为了满足大家的需求,我开始参考网上大佬们的思路,潜心研究,通宵达旦,废寝忘食,夜以继日…………经过几十次调试 终于写出了第三代【豪华版】代理池。
【第三代代理池优点】:
1.轮询变动IP2.适者生存,不适者淘汰3.异步处理,无需等待4.sqlmap搭档神器,IP跳来跳去5.无需配置数据库,WEB AIP轻松解决实现代理池轮询变动-从第一个字母开始敲起
源码及Redis安装包统一放在附件链接:https://pan.baidu.com/s/1ItbE3uc6c5W1ImXHs535UQ
提取码:zkaq
一、代理池的实现条件
(1).我们需要安装几个python中的库,安装起来很简单,pip3 install “库名”。
以下是需要用到的库,其中包含系统自带库,大家可根据自身情况选择安装。
(若缺少相应的模板,在脚本执行时会有报错提示的,可根据提示补充安装)
Redis,redis-dump Pyquery urllib random asyncio aiohttp botocore multiprocessing(2).安装Redis-x64-3.0.504.msi,若不安装则脚本运行时会报错:
Error 10061 connecting to 127.0.0.1:6379.
由于目标计算机积极拒绝,无法连接
报错原因:Redis服务没有启动
安装教程:github下载,下载速度极慢,需要搭梯子。
考虑到本篇文章不能涉及翻墙,所以我将我搭梯子下载好的Redis-x64-3.0.504.msi放到附件中,供大家安装。
二、代理思路
(1).模块创建
本次代理池全面升级,为了实现功能的多样性,以及保证脚本的稳定性,可读性。我将创建6个模块脚本,实现从代理池运行——》爬取代理——》存储——》检测——》接口——》调用等功能。
(2).实现代理维护使用代理赋值法,将批量获取的免费代理统一赋初始值为10,并存入数据库中,通过检测模块向代理服务器发送请求,若首次请求成功,则将该代理初始值提升至100,若首次请求失败则将初始值减1,若代理值减为0,则将代理从代理池中移除。
(3.)代理调用通过WEB API接口,拿到随机可用代理,根据我们为代理赋加的值,优先获取最高值代理(值越高越稳定),若无最高值代理,则根据值的大小进行排名,优先输出排名最靠前的代理,供我们使用。
三、代理编写
说实话……我真不能一点一点告诉你们每句代码的意思,因为代码太多了,
所以保姆式教程在本篇文章中就不适用了。
还望大家理解,但我一定能保证大家看得懂本篇文章,并且在看完本篇文章后
,能够获得如何编写自己代理池的思路。
小白同学就要受点委屈了~不过我相信本篇文章对小白的提升一定是巨大的!!!
存储模块编写:
(1).将爬取的代理存储到Redis数据库中,通过定义一个类来操作Redis的有序集合。
(2).我们前面说过要为代理赋初始值,并且根据赋值大小进行排序,
所以我们不得不调用Redis有序集合来满足我们的要求。
什么是Redis有序集合: Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。这里的double类型分数就是我们为代理赋的值。 现在我们要定义一个类,一些方法 和一些常量来实现代理的存储。创建db.py为存储模块:下面内容需要我们对Redis库的函数有所了解,要不然是看不懂的。
python # coding=gbk #存储模块首先来定义一些常量:
python MAX_SCORE = 100 #最大值 MIN_SCORE = 0 #最小值 INITIAL_SCORE = 10 #初始值 REDIS_HOST = localhost #Redis连接IP REDIS_PORT = 6379 #Redis连接端口 REDIS_PASSWORD = None #连接密码,大家根据自己需求选择 REDIS_KEY = proxies #有序集合键名,获取代理存储使用的有序集合调用库并创建类和方法:
python import redis #实现Redis的连接及使用 from random import choice #返回一个列表,元组或字符串的随机项 class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): """ 初始化 :param host: Redis 地址 :param port: Redis 端口 :param password: Redis密码 """ self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)这里有个小细节:self.db是定义一个实例,用来连接数据库的,
里面有一个decode_responses参数值为True 这是因为当我们要求返回键值时
若不加此条件则会返回——b’Value’ ,返回结果会有一个跟屁虫b,
b代表为byte数据类型,所以当添加decode_responses=True时,
返回结果就不会有跟屁虫了——’Valie’
python def add(self, proxy, score=INITIAL_SCORE): """ 添加代理,设置分数为最高 :param proxy: 代理 :param score: 分数 :return: 添加结果 """ if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY, score, proxy)add()实现为批量获取的代理附加初始值10。
def random(self): """ 随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常 :return: 随机代理 """ result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: raise PoolEmptyError第一个result包含所有值为100的代理(最高效代理)
第二个result就是矬子里拔将军,但也很不错的。
def decrease(self, proxy): """ 代理值减一分,小于最小值则删除 :param proxy: 代理 :return: 修改后的代理分数 """ score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print(代理, proxy, 当前分数, score, 减1) return self.db.zincrby(REDIS_KEY, proxy, -1) else: print(代理, proxy, 当前分数, score, 移除) return self.db.zrem(REDIS_KEY, proxy)Redis基础语法不讲了,实现无用代理移除功能
def exists(self, proxy): """ 判断是否存在 :param proxy: 代理 :return: 是否存在 """ return not self.db.zscore(REDIS_KEY, proxy) == None #简略写法,当代理池无代理时 返回not def max(self, proxy): """ 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 """ print(代理, proxy, 可用,设置为, MAX_SCORE) return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy) def count(self): """ 获取数量 :return: 数量 """ return self.db.zcard(REDIS_KEY) def all(self): """ 获取全部代理 :return: 全部代理列表 """ return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)创建crawler.py为获取模块:
从各大网站批量爬去代理,比较简单 不做过多说明 以前文章有涉及过爬虫的原理。
调用库及函数
from pyquery import PyQuery as pq #解析 import urllib.request #请求创建一个类
class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs[__CrawlFunc__] = [] for k, v in attrs.items(): if crawl_ in k: attrs[__CrawlFunc__].append(k) count += 1 attrs[__CrawlFuncCount__] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): #callback = crawl_daili66 就是下面定义的获取代理的方法名称 proxies = [] for proxy in eval("self.{}()".format(callback)): print(成功获取到代理, proxy) proxies.append(proxy) return proxies其实这里借助于元类来实现【kk……对于元类我了解的也不深】。
这是attrs字典形式 K为键 V为值。当中包含我们定义的方法名称 如图:
解释代码:
定义了一个 ProxyMetaclass,Crawl 类将它设置为元类,元类中实现了 new() 方法,
这个方法有固定的几个参数,
其中第四个参数 attrs 中包含了类的一些属性,
这其中就包含了类中方法的一些信息,我们可以遍历 attrs 这个变量即可获取类的所有方法信息。
所以在这里我们在 new() 方法中遍历了 attrs 的这个属性,
就像遍历一个字典一样,键名对应的就是方法的名称,
接下来判断其开头是否是 crawl_,
如果是,则将其加入到 CrawlFunc 属性中,
这样我们就成功将所有以 crawl 开头的方法定义成了一个属性,
就成功动态地获取到所有以 crawl 开头的方法列表了。
下一步定义方法:主要是用爬虫来爬取代理:
这里我们要记住,为了实现多方法爬取不同代理网站的想法,
我们必须统一规定被定义的方法以”crawl_”开头
def crawl_daili66(self,page_count=4): start_url = {}.html urls = [start_url.format (page) for page in range(1, page_count + 1)] for url in urls: print(Crawling, url) req = urllib.request.Request(url=url) res = urllib.request.urlopen(req) html = res.read() if html: doc = pq(html) trs = doc(.containerbox table tr:gt(0)).items() for tr in trs: ip = tr.find(td:nth-child(1)).text() port = tr.find(td:nth-child(2)).text() yield : .join([ip,port]) def crawl_proxyXH(self): start_url = "{}.html" urls = [start_url.format(page) for page in range(1,10)] headers = {User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/ Firefox/23.0} for url in urls: req = urllib.request.Request(url=url,headers=headers) res = urllib.request.urlopen(req) html = res.read() if html: doc = pq(html) trs = doc(.layui-row table tr:gt(0)).items() for tr in trs: ip = tr.find(td:nth-child(1)).text() #tr节点下第一个td子节点的文本内容 port = tr.find(td:nth-child(2)).text() yield : .join([ip,port])别看上面代码这么长,耐心看一眼就能看懂。
唯一要点一下的一处是,这里的tr:gt(0)容易理解错,这里使用gt(0),并不是说包含了从第一个tr节点到最后一个tr节点的所有节点,而是包含首个tr节点的下一个兄弟节点及其后的所有tr节点。
换句话说,就是除了第一个tr节点外 其他的tr节点都包含其中
我找了两个比较好用且访问速度比较快的网站,之所以抛弃”小幻代理”是因为容易出现访问延迟的现象,影响脚本的稳定性。
第二个代理网站需要设置请求头的,要不然会禁止访问。
爬虫方法是如何运行的见下图,以66ip为例:
逐层锁定,遍历tr节点 筛选子节点 都是爬虫基础,不过多解释,不懂可以私下问我。
我们还需要创建一个getter.py的模块,用来动态调用所有以crawl_开头的方法,然后抓取代理,存储到数据库中。
getter.py模块:
因为涉及到获取和存储 所以直接从先前两个模块中调用已经创建好的两个类
from db import RedisClient from crawler import Crawler限制代理池的最大容存量为10000
POOL_UPPER_THRESHOLD = 10000定义Getter类,创建实例,为了调用其中类的函数
class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 调用RedisClient中count()函数 判断是否达到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print(获取器开始执行) if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): #从列表获取所有包含crawl_的方法 callback = self.crawler.__CrawlFunc__[callback_label] proxies = self.crawler.get_proxies(callback) #承接获取模块中的函数,获取代理 for proxy in proxies: self.redis.add(proxy) #存储到redis数据库中定义了 is_over_threshold() 方法判断代理池是否已经达到了容量阈值,
它就是调用了 RedisClient 的 count() 方法获取代理的数量,
然后加以判断,如果数量达到阈值则返回 True,否则 False。
如果不想加这个限制可以将此方法永久返回 True。
接下来定义了 run() 方法,首先判断了代理池是否达到阈值,
然后在这里就调用了 Crawler 类的 CrawlFunc 属性,
获取到所有以 crawl 开头的方法列表,
依次通过 get_proxies() 方法调用,得到各个方法抓取到的代理,
然后再利用 RedisClient 的 add() 方法加入数据库
创建tester.py为检测模块:
在上述db.py crawler.py getter.py三个模块中,我们已经能够成功获取代理并且将其放入数据库中。然后就需要一个检测模块来对所有的代理进行一轮轮的检测,检测可用就设置为 100,不可用就分数减 1,这样就可以实时改变每个代理的可用情况,在获取有效代理的时候只需要获取分数高的代理即可。
由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库 Aiohttp 来进行检测。
为什么要用Aiohttp呢,来回想一下,我们在请求单个网址的时候通常习惯使用requests来请求,而Requests 作为一个同步请求库,我们在发出一个请求之后需要等待网页加载完成之后才能继续执行程序。
也就是这个过程会阻塞在等待响应这个过程,如果服务器响应非常慢,比如一个请求等待十几秒,那么我们使用 Requests 完成一个请求就会需要十几秒的时间,中间其实就是一个等待响应的过程,程序也不会继续往下执行。
而Aiohttp异步请求库便完美的解决了这个问题。
在请求发出之后,程序可以继续接下去执行去做其他的事情,当响应到达时,会通知程序再去处理这个响应,这样程序就没有被阻塞,充分把时间和资源利用起来,大大提高效率。
所以在这里我们的代理检测使用异步请求库 Aiohttp,实现示例如下:
VALID_STATUS_CODES = [200] TEST_URL = BATCH_TEST_SIZE = 100设置好状态码200为目标服务器已经处理了请求。
BATCH_TEST_SIZE设置好一次检测的最大代理量,这里一次最多检测100个代理TEST_URL:使用该网站进行检测,可以设置为一个不会封 IP 的网站。
百度就很不错哦~
from db import RedisClient import asyncio #用来编写 并发 代码的库 import aiohttp import timeinit() 方法中建立了一个 RedisClient 对象,供类中其他方法使用
class Tester(object): def __init__(self): self.redis = RedisClient()接下来定义了一个 test_single_proxy() 方法,用来检测单个代理的可用情况,其参数就是被检测的代理。
async def test_single_proxy(self, proxy): conn = aiohttp.TCPConnector(verify_ssl=False) #用于使用TCP处理HTTP和HTTPS的连接器 async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode(utf-8) real_proxy = http:// + proxy print(正在测试, proxy) async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response: #调用get()请求代理 if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print(代理可用, proxy) else: self.redis.decrease(proxy) print(请求响应码不合法, proxy) except : self.redis.decrease(proxy) print(代理请求失败, proxy)注意这个方法前面加了 async 关键词,代表这个方法是异步的,
方法内部首先创建了 Aiohttp 的 ClientSession 对象,
此对象类似于 Requests 的 Session 对象,可以直接调用该对象的 get() 方法来访问页面。在这里代理的设置方式是通过 proxy 参数传递给 get() 方法,请求方法前面也需要加上 async 关键词标明是异步请求,这也是 Aiohttp 使用时的常见写法。
def run(self): print(测试器开始运行) try: proxies = self.redis.all() # 所有的代理 loop = asyncio.get_event_loop() #事件循环的获取 # 批量测试 for i in range(0, len(proxies), BATCH_TEST_SIZE): test_proxies = proxies[i:i + BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5) except Exception as e: print(测试器发生错误, e.args)Aiohttp语法我不了解,这部分直接扒书上的代码用。
创建api.py接口模块:
提到接口大家想到的肯定就是API,那么这里为什么不使用其他数据库,比如上面的Redis数据库,而是调用WEB API来实现接口模块的运行呢?
(1).数据库密码泄露风险(2).为了远程连接代理池(3).便于同步更新
这样获取代理只需要请求一下接口即可,以上的几个缺点弊端可以解决。
我们在这里使用一个比较轻量级的库 Flask 来实现这个接口模块,实现示例如下:
from flask import Flask, g #调用flask库 from db import RedisClient #调用类这是个小知识点all是个变量列表,我们看到all等于[‘app’]意思就是说 ,在本模块中,若不引用该模块,则只允许执行app函数-> Flask(name)
__all__ = [app] app = Flask(__name__)初始化:所有的Flask都必须创建程序实例,
web服务器使用wsgi协议,把客户端所有的请求都转发给这个程序实例程序实例是Flask的对象,一般情况下用如下方法实例化
Flask类只有一个必须指定的参数,即程序主模块或者包的名字,name是系统变量,该变量指的是本py文件的文件名
def get_conn(): if not hasattr(g, redis): g.redis = RedisClient() return g.redis @app.route(/) def index(): return <h2>Welcome to Proxy Pool System</h2> @app.route(/random) def get_proxy(): conn = get_conn() return conn.random() @app.route(/count) def get_counts(): conn = get_conn() return str(conn.count())客户端发送url给web服务器,web服务器将url转发给flask程序实例,程序实例
需要知道对于每一个url请求启动那一部分代码,所以保存了一个url和python函数的映射关系。
处理url和函数之间关系的程序,称为路由
在flask中,定义路由最简便的方式,是使用程序实例的app.route装饰器,把装饰的函数注册为路由
if __name__ == __main__: app.run()最后一个模块 我们称之为调度模块。
创建run.py调度模块:
这个模块其实就是调用以上所定义的三个模块,将以上三个模块通过多进程的形式运行起来,示例如下:
TESTER_CYCLE = 20 GETTER_CYCLE = 20 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True在这里还有三个常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布尔类型,True 或者 False。
标明了测试模块、获取模块、接口模块的开关,如果为 True,则代表模块开启。
import time from multiprocessing import Process from api import app #调用接口模块 from getter import Getter #调用获取模块(2) crawler.py与getter皆为获取模块 from tester import Tester #调用检测模块三个调度方法结构也非常清晰,比如 schedule_tester() 方法,这是用来调度测试模块的方法,
首先声明一个 Tester 对象,然后进入死循环不断循环调用其 run() 方法,执行完一轮之后就休眠一段时间,休眠结束之后重新再执行。
在这里休眠时间也定义为一个常量,如 20 秒,这样就会每隔 20 秒进行一次代理检测。
class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): tester = Tester() while True: print(测试器开始运行) tester.run() time.sleep(cycle) #每隔20秒从数据库获取一次代理 def schedule_getter(self, cycle=GETTER_CYCLE): getter = Getter() while True: print(开始抓取代理) getter.run() time.sleep(cycle) def schedule_api(self): app.run(127.0.0.1,5000) #这里要看分配,我这儿分配的是5000端口也就是AIP_PORT,这个端口怎么看,大家可以直接执行aip.py模块启动入口是 run() 方法,其分别判断了三个模块的开关,如果开启的话,就新建一个 Process 进程,设置好启动目标,然后调用 start() 方法运行,这样三个进程就可以并行执行,互不干扰。
def run(self): print(代理池开始运行) if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()运行的时候要这样写:
if __name__==__main__: Scheduler().run()否则就会出现一个报错提示
四、代理池的运行
(1).首先要保证Redis服务开启,若没开启的话请下载并安装Redis-x64-3.0.504.msi安装包已添加到附件中
点开直接无脑‘下一步‘,安装成功后,找到.exe文件所在目录,双击执行即可。
Port:6379别动就行,脚本里面已经提前写好了。
(2).回到run.py脚本 直接执行(保证所有模块均在同一目录下)
(3).执行效果:如图