由于爬虫工作往往有大量数据需要爬取,便需要大量的备用IP更换,这时就需要用到代理IP池。将大量可以用于更换的代理IP汇聚要一起,便于管理和调用,IP池就这样产生了。IP池有一下特征:它里面的IP是持续补充的,会有源源不断的新的IP被加入到池子中。它里面的IP是有生命周期的,一但失效就会被清除出 IP池;它里面的IP是可以被任意取出,方便爬虫用户使用的。
私信小编01即可获取大量Python学习资料
免费ip其实是不适合搭建代理池的,因为数量上面不具备优势,而且很耗时,大家需要用时间来一一排查,要做就要做好,建议大家还是选择专业一点的提供商。
代理池主要分四个部分:IP抓取,IP有效性检测,IP存储,webAPI提供。示意图如下:
数据库选择redis,它的有序集合可以给插入值赋于一个"分数"用于排序存取值等等,我们可以利用分数绑定ip的可用性进行提取及检测管理。Redis有序集合的基本操作
爬取模块该模块主要是采用爬虫去付费api或者代理网站来获取ip,加入数据库并设置初始分数50分。在这里我采用了aiohttp异步采集,提高效率。所有爬虫基于父类开发。
class BaseCrawler(object): urls = [] # new_loop = asyncio.new_event_loop() # asyncio.set_event_loop(new_loop) LOOP = asyncio.get_event_loop() asyncio.set_event_loop(LOOP) @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None) async def _get_page(self, url): async with aiohttp.ClientSession() as session: try: async with session.get( url,timeout=10 ) as resp: # print(dir(resp.content),resp.content) return await resp.text() except: return def crawl(self): """ crawl main method """ for url in self.urls: app_logger.info(ffetching {url}) # html = self.LOOP.run_until_complete(asyncio.gather(self._get_page(url))) html = self.LOOP.run_until_complete(self._get_page(url)) # print(html, html) for proxy in self.parse(html): # app_logger.info(ffetched proxy {proxy.string()} from {url}) yield proxy注:一般采取付费的代理平台,IP稳定寿命长。
测试模块先明确策略,代理可用则设置为100分,不可用则减一定分值,分值减为0后视为完全不可用且从数据库中删除,当需要使用代理时从redis中优先取出100分ip中的随机一个,若无100分的则取最高分中的一个。检测也采用异步模式,最开始制定测试网址(默认为百度)。
class Tester(object): """ tester for testing proxies in queue """ def __init__(self): self.redis = RedisClient() self.loop = asyncio.get_event_loop() async def test(self, proxy): """ test single proxy :param proxy: :return: """ async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: try: app_logger.debug(ftesting {proxy}) async with session.get(TEST_URL, proxy=fhttp://{proxy}, timeout=TEST_TIMEOUT, allow_redirects=False) as response: if response.status in TEST_VALID_STATUS: self.redis.set_max(proxy) app_logger.debug(fproxy {proxy} is valid, set max score) else: self.redis.decrease(proxy) app_logger.debug(fproxy {proxy} is invalid, decrease score) except EXCEPTIONS: self.redis.decrease(proxy) app_logger.debug(fproxy {proxy} is invalid, decrease score) def run(self): """ test main method :return: """ # event loop of aiohttp app_logger.info(stating tester...) count = self.redis.count() app_logger.debug(f{count} proxies to test) for i in range(0, count, TEST_BATCH): # start end end offset start, end = i, min(i + TEST_BATCH, count) app_logger.debug(ftesting proxies from {start} to {end} indices) proxies = self.redis.batch(start, end) tasks = [self.test(proxy) for proxy in proxies] # run tasks using event loop self.loop.run_until_complete(asyncio.wait(tasks)) API接口模块该模块以Flask搭建简易web服务,提供随机获取有效IP及总IP数的接口。
__all__ = [app] app = Flask(__name__) def get_conn(): """ get redis client object :return: """ if not hasattr(g, redis): g.redis = RedisClient() return g.redis @app.route(/) def index(): """ get home page, you can define your own templates :return: """ return <h2>Welcome to Proxy Pool System</h2> @app.route(/random) def get_proxy(): """ get a random proxy :return: get a random proxy """ conn = get_conn() return conn.random() @app.route(/count) def get_count(): """ get the count of proxies :return: count, int """ conn = get_conn() return str(conn.count())以上为代理池的基本功能,另外需要再创建一个调度器协调控制各功能模块,检测与获取IP需要定时进行,web服务在后台运行。定时采用Python的定时管理库apscheduler,同时通过multiprocessing管理各功能模块多进程运行。
class Scheduler(object): """Scheduler""" def run_tester(self, cycle=CYCLE_TESTER): """ 定时启动检测器 :param cycle: :return: """ if not ENABLE_TESTER: app_logger.info(tester not enabled, exit) return tester = Tester() sch.add_job(tester.run, interval, seconds=cycle, id=run_tester) sch.start() def run_getter(self, cycle=CYCLE_GETTER): """ 定时开启爬虫补充代理 :param cycle: :return: """ if not ENABLE_GETTER: app_logger.info(getter not enabled, exit) return getter = Getproxies() sch.add_job(getter.run, interval, seconds=cycle, id=run_getter) sch.start() def run_server(self): """web服务端开启""" if not ENABLE_SERVER: app_logger.info(server not enabled, exit) return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED) def run(self): """调度器启动""" global tester_process, getter_process, server_process try: app_logger.info(starting proxypool...) if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) app_logger.info(fstarting tester, pid {tester_process.pid}...) tester_process.start() if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) # print(dir(getter_process),getter_process) app_logger.info(fstarting getter, pid{getter_process.pid}...) getter_process.start() if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) app_logger.info(fstarting server, pid{server_process.pid}...) server_process.start() tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: app_logger.info(received keyboard interrupt signal) tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() app_logger.info(ftester is {"alive" if tester_process.is_alive() else "dead"}) app_logger.info(fgetter is {"alive" if getter_process.is_alive() else "dead"}) app_logger.info(fserver is {"alive" if server_process.is_alive() else "dead"}) app_logger.info(proxy terminated)