用Python打造在线盈利的项目-代理服务

前言

这是Python学习系列文章的中篇,当你学完这个系列的文章之后,我希望你能学到以下知识点:

学会从静态UML图入手来初步整理开源项目各个模块的依赖关系学会从数据流来进一步分析各个模块是如何合作的学会socks5协议的tcp与udp部分学会异步io的系统是怎么写的学会状态机是什么,以及状态机用于解决什么问题根据用户场景来设计接口或者命令学会对称加密中有一大类是流加密

内容目录

1.回顾过去,借助已有开源框架实现RequestRelayer服务1.1 情景回顾1.2 重新认识RequestRelayer2.RequestRelayer模块分解2.1 都有哪些模块组成2.2 模块之间的依赖关系3.RequestRelayer工作流3.1 tcp类型的加密中转服务3.2 udp类型的加密中转服务4.为RequestRelayer添加新功能4.1 用户场景分析4.2 为每个用户场景添加对应的功能

1. 回顾过去,借助已有开源框架实现RequestRelayer服务

1.1 情景回顾

经过一段时间的忙碌,我还有事儿没完成,继续分享一下之前我在这个Live(用Python打造在线盈利的项目)中没有完成的事情。这篇文章将讲解使用Python实现RequestRelayer的细节,如下图红色矩形模块所示。

想要在短期内上线一款服务,我们一般会借助一些开源的项目,这个RequestRelayer也不例外,因此我们会在这个项目的基础上为RequestRelayer添加功能。在这里我就用RequestRelayer代替这个项目的名字,这个项目很优秀,如果大家感兴趣,那么就去看看他们每日的输出。为了定制符合RequestRelayer的功能,我们就有必要熟悉和理解我们选择的开源项目的源代码。因此本篇文章将对我们选择的开源项目进行源码分析,了解它的模块组成部分,工作流,数据流之后,我们将在合适的位置为这个开源的项目添加适合我们业务场景的功能。

1.2 重新认识RequestRelayer

RequestRelayer共有2部分子系统,分别是下图黄色部分的local与红色部分的server。local相当于上图的RequestRelayer Client,server相当于上图的RequestRelayer,不过切记,local与server共同组成了RequestRelayer。local运行在用户的PC机上(命令行版本),server运行在远程的vps上(也就是一台运行linux OS的PC机)。

有上图可知,我们需要学习local与server两个子系统,以便我们日后能够为具体的业务实现自己的功能。

2. RequestRelayer模块分解

2.1 都有哪些模块组成

好,在分析别人的项目的源码之前,我们一般需要将别人的项目中涉及重要功能的static class diagram绘制出来,好消息是我已经绘制了,请看下图:

这个图简要的将项目中各个功能模块的从属关系展现出来,是我们了解别人项目源码的第一步。通过这个图,我们要逼迫自己做到以下几点来理解这个开源项目:

这个项目由哪几个功能模块组成,每个模块都提供了什么功能

由上图我们可以知道组成这个开源项目的功能模块不多,对我们而言是好事,因为我们可以在短期内学完这个开源项目。我们再来看看每个功能模块都提供了哪些功能:

local: 完成与socks5客户端握手,建立连接,传输3个阶段;加密转发来自socks5客户端的信息到server模块,常见的socks5客户端有chrome浏览器。local是需要用户填写好配置文件后,在PC端用Python启动。

server: 解密来自local的信息,转发到target server,并接收加密target server返回的信息,再将加密的信息返回给local。

daemon: 守护者,linux里面有一种进程,是运行在后端,你没法看到它的用户界面的。而这个模块的功能就是使得local和server运行在后端,用户没法直接干预它们。

shell: 外壳,主要用来检测Python版本,解析配置文件,打印local和server的使用说明以及异常处理。

manager: 多用户管理模块,这个模块主要用于管理多个用户使用加密中转服务(包括添加用户,移除用户,查看当前活跃用户等接口),我们将在这里添加一些接口来挂起流量用超的用户以及每个月恢复挂起的用户。

eventloop: 事件循环,这个模块非常重要,因为它实现了异步IO功能,通过它我们可以添加多个socket,并且监听每个socket的状态,然后根据状态去处理对应的事件。

asyncdns: 异步dns,这个模块主要的功能是将域名异步的解析成ip,Python中提供了域名解析接口,但却是同步的。

tcprelay: tcp转发器,这个模块基于tcp连接,创建了一个socket,这个socket用于接收信息,并且创建TCPRelayHandler。

TCPRelayHandler: tcp转发处理,这个模块创建了2个socket,分别是_local_sock和_server_socket,同时这里面维护了一个状态机以及针对每一个状态的处理函数。

udprelay: udp转发器,这个模块基于udp连接

lru_cache: least recently used缓存

encrypt: 提供加密解密方法,用于加/解密信息

2.2 模块之间的依赖关系

local和server均为上层模块,需要借助很多下层模块来提供完整的加密中转服务。由于local和server的工作方式有一些区别,同时它们都会依赖相同的下层模块,因此我们需要从local和server方面分析各功能模块的依赖关系。通过上图我们知道,local和server是程序的入口点,因此我们应该从local和server开始分析这个开源项目的模块依赖关系。

local依赖的模块

local需要将socks5客户端的信息加密转发到server,因此需要用到通信功能,在上图模块中能代表通信功能的模块有tcprelay,TCPRelayHandler,udprelay;此外需要监听每个通信socket的状态,因此也需要eventloop来协助完成;在通信过程中还会涉及加密功能,因此会依赖encrypt模块;在通信过程中需要缓存一些ip信息以及socket信息,为什么要缓存,原因是省时间,因此我们需要用到lru_cache模块;在和其它计算机通信之前,我们有时需要解析这台计算机的域名,最终获得该计算机的ip进行通信连接,因此我们需要用到asyncdns模块;local启动的时候还需要读取配置文件和检查Python的版本,这个时候我们需要依赖shell模块;倘如我们需要使local能够像守护者进程一样启动,那么我们就需要借助daemon模块。我相信写到这里你的思维已经凌乱了,那我们按照以下方式再捋一捋思路:

根据上图,local做的事情主要是,读取配置文件(包含了用户的本地端口,本地ip,远程端口,远程ip,加密方法等,与shell模块有关),根据配置文件的选项来决定是否通过daemon的方式运行local(与daemon模块有关),创建1个tcprelay,1个udprelay,1个asyncdns,1个eventloop,并把tcprelay,udprelay,asyncdns这些实例加入到eventloop中,由eventloop监听它们的网络读写状态,这个时候local就运行起来了,eventloop进入死循环状态,不停的监听加入到它的socket,其中TCPRelayHandler会根据socks5客户端发起请求由tcprelay创建,encrypt也随着TCPRelayHandler的创建而创建,lru_cache也将随着udprelay,asyncdns的创建而创建。最后我们发现,local只是把所有模块的集聚地,它负责把所有模块初始化,最后由eventloop接管所有网络读写状态。

server依赖的模块

server需要接收并解密local发送过来的信息,并转发给target server,再将target server返回的信息进行加密并返回给local,与local不同的是,server需要在线添加或移除用户的通信节点,因此除了与local所依赖的模块一样之外,还必须引入manager模块,如下图所示。同样,当server启动之后,eventloop将接管所有网络读写状态。

3. RequestRelayer工作流

分析完开源项目的源码之后,接着就要想想这个开源项目是如何工作的,以及我们要在现有的工作流上添加新的功能,下图展示了RequestRelayer的工作流。

上图分为3大部分,分别是SS Controller,local,server,我们来一一解释。

SS Controller

这个是web网站,使用Python开发的,之后我会单独写一篇文章,来告诉大家如何开发这个网站,目前我们只需要知道它是用来管理server,以及供用户注册购买自助开通账号的网页版应用,server会暴露一些api供SS Controller调用。

local

这个是RequestRelayer的一部分,上图有很多个local,每一个local是运行在不同的用户PC机上,图中有n个用户,每个用户运行自己的local,socks5客户端会与local通信。注意每一个用户都有自己的eventloop。

server

这个是RequestRelayer的一部分,上图只有一个server,这个server会为每一个用户开通与之对应的通信节点,所有这些通信节点会由唯一一个eventloop轮询监听它们的网络读写状态,图中有n个用户,每一个用户与左边的local一一对应。

好了,我们来把这三部分串起来:准备一台有公网IP的PC,使用Python运行server,这时我们将暴露一些restful api可供SS Controller使用了,最常用的api分别是add_port(为某个用户开通服务),remove_port(移除到期用户所使用的服务),detach_port(挂起流量用超的用户所对应的服务节点),attach_port(恢复服务期限内流量用超用户所对应的服务节点)。部署好server之后我们还需要一个具有公网的PC,部署SS Controller,这个SS Controller通过对server暴露的restful api排列组合,实现上图ssManager模块来操作server。当用户通过SS Controller付费自助开通server上的服务之后,用户需要在他自己的PC机上,为local配置好(本地ip,本地端口,远程ip,远程端口,密码,加密方式),并启动local。最后用户打开socks5客户端,比如chrome浏览器,填写本地ip和本地端口,进而使用server上已开通的服务。上图左边部分与右边部分之间通信是完全加密的(对传输的信息解密,就需要让local与server知道密码和加密方式)。

以上便是感性的认识,有点类似3个代表思想,下面我将结合上图刨根问底将这个工作流所涉及的节点标示出来。请看下图:

用户访问网站,以Paypal的方式购买服务,此时会调用ssManager模块,该模块会通过udp连接,对server发出加密请求,让server为该用户开通加密中转服务,请求会被eventloop监听到,然后由manager接收,并且对接收到的请求进行解密,再解析请求的命令,调用开通服务的接口,完成之后,server会添加一个绿色的节点,比如用户1的连接sock,这个绿色的节点同时提供2种类型的连接通道,一种是tcp类型的通道,由tcprelay提供服务,另外一种是udp类型的通道,由udprelay提供服务。因此我们需要分2条线来逐一分析每种类型通信过程中所涉及的模块。

当用户自助开通加密中转服务之后,紧接着用户需要使用该服务,用户可以选择使用tcp类型的服务,也可以选择udp类型的服务,下面将针对每一种类型的服务所涉及的模块以及工作流一一分析。

3.1 tcp类型的加密中转服务

上图就是tcp类型的加密中转服务的工作流,由四个子系统组成,分别是chrome,local,server,target server(比如google server)。local与chrome运行在用户的PC端,使用tcprelay来监听来自chrome的连接;server运行在具有公网IP的PC机上,等待来自上图local的连接;target server代表其它公司的server,比如说google的web server或者facebook的web server,chrome决定server访问哪个target server;比如在chrome里输入http://www.google.com,那么target server就是google的web server。这个过程的详细步骤如下所描述。

1、chrome发起请求,通知tcprelay创建TCPRelayHandler,并将TCPRelayHandler中的_local_sock加入到eventloop,这一步主要是建立TCP连接,连接chrome与TCPRelayHandler._local_sock,这一步过后,local.TCPRelayHandler将进入INIT状态

local.TCPRelay._server_socket的处理逻辑如下def handle_event(self, sock, fd, event): conn = self._server_socket.accept() TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local)

2、chrome向TCPRelayHandler._local_sock发起请求,请求内容为socks5定义的第一阶段内容(认证协商)。此时会由chrome发出所有可接受的认证方法到TCPRelayHandler._local_sock

local.TCPRelayHandler._local_sock的处理逻辑如下#chrome会发送以下格式的请求 +----+----------+----------+ |VER | NMETHODS | METHODS| +----+----------+----------+ | 1|1 | 1 to 255 | +----+----------+----------+ #处理逻辑如下 def _handle_stage_init(self, data): _check_auth_method(self, data) def _check_auth_method(self, data): # VER, NMETHODS, and at least 1 METHODS if len(data) < 3: logging.warning(method selection header too short) raise BadSocksHeader socks_version = common.ord(data[0]) nmethods = common.ord(data[1]) if socks_version != 5: logging.warning(unsupported SOCKS protocol version+ str(socks_version)) raise BadSocksHeader if nmethods < 1 or len(data) != nmethods + 2: logging.warning(NMETHODS and number of METHODS mismatch) raise BadSocksHeader noauth_exist = False for method in data[2:]: if common.ord(method) == METHOD_NOAUTH: noauth_exist = True break if not noauth_exist: logging.warning(none of SOCKS METHOD\s requested by client is supported) raise NoAcceptableMethods

3、TCPRelayHandler._local_sock将支持的认证方式返回给chrome,完成socks5第一阶段。完成这一步之后local.TCPRelayHandler将进入ADDR状态

local.TCPRelayHandler._local_sock的处理逻辑如下#在此处将**socks5**定义的方法X00 NO AUTHENTICATION REQUIRED返回,同时切换到ADDR状态 def _handle_stage_init(self, data): self._write_to_sock(b\x05\00, self._local_sock) self._stage = STAGE_ADDR #返回的格式如下 +----+--------+ |VER | METHOD | +----+--------+ | 1| 1| +----+--------+

4、 chrome接着向TCPRelayHandler._local_sock发起建立连接请求,这个阶段属于socks5的第2个阶段(建立连接)

local.TCPRelayHandler._local_sock的处理逻辑如下#建立连接请求格式如下 +----+-----+-------+------+----------+----------+-------+ |VER | CMD |RSV| ATYP | DST.ADDR | DST.PORT | DATA| +----+-----+-------+------+----------+----------+-------+ | 1|1| 1 |1 | Variable |2 | m | +----+-----+-------+------+----------+----------+-------+ 比如访问www.google.com的报文如下 VERCMDRSVATYP 域名长度域名 80端口 0x05 0x01 0x00 0x03 0x0a bgoogle.com0x00 0x50 #在此处处理请求的数据 CMD_CONNECT = 0x01 def _handle_stage_addr(self, data): if self._is_local: cmd = common.ord(data[1]) if cmd == CMD_CONNECT: # just trim VER CMD RSV data = data[3:] #执行这一步之后的data的格式如下 +------+----------+----------+------+ | ATYP | DST.ADDR | DST.PORT | DATA | +------+----------+----------+------+ |1 |n |2 |m | +------+----------+----------+------+

5、TCPRelayHandler._local_sock响应来自chrome发起的建立连接请求,完成socks5第2阶段(建立连接)。完成这一步后将进入DNS状态

local.TCPRelayHandler._local_sock的处理逻辑如下def _handle_stage_addr(self, data): self._stage = STAGE_DNS # forward address to remote # 响应格式如下 +----+-----+-------+------+----------+----------+ |VER | REP |RSV| ATYP | BND.ADDR | BND.PORT | +----+-----+-------+------+----------+----------+ | 1|1| X00 |1 | Variable |2 | +----+-----+-------+------+----------+----------+ self._write_to_sock((b\x05\x00\x00\x01 b\x00\x00\x00\x00\x10\x10), self._local_sock) data_to_send = self._encryptor.encrypt(data) self._data_to_write_to_remote.append(data_to_send)

6、local根据用户配置的远程服务器地址发起异步域名解析并直接返回,这里的远程服务器地址就是指上图运行着server的PC

local.DNSResolver._sock的处理逻辑如下def _handle_stage_addr(self, data): # notice here may go into _handle_dns_resolved directly # self._chosen_server[0]来自json配置文件,也就是上图server的域名或者IP self._dns_resolver.resolve(self._chosen_server[0], self._handle_dns_resolved)

7、 得到server所运行的PC的公网IP

local.DNSResolver._sock的处理逻辑如下def _handle_dns_resolved(self, result, error): ip = result[1] remote_addr = ip #self._chosen_server[1]来自json配置的server监听的远程端口 remote_port = self._chosen_server[1]

8、 local中的TCPRelayHandler根据用户配置的远程端口以及第7步得到的IP,创建_remote_sock并向上图server.tcprelay建立TCP连接,并将_remote_sock加入到eventloop进行监听,完成这一步了就进入了local的TCPRelayHandler就进入了CONNECTING状态。同时server新建一个TCPRelayHandler,把TCPRelayHandler._local_sock加入到eventloop中,紧接着进入INIT状态

local.DNSResolver._sock的处理逻辑如下 def _handle_dns_resolved(self, result, error): remote_sock = self._create_remote_socket(remote_addr, remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server) self._stage = STAGE_CONNECTING #打开上行通道读(_local_sock)和写(_remote_sock)的阀门,让数据流通 #由于remote_sock.connect是异步操作,那么就需要有CONNECTING状态 self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) #打开下行通道读(_remote_sock)的阀门,让数据只允许读进来,不允许写(_local_sock) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) server.TCPRelay._server_socket的处理逻辑如下# eventloop会触发这个函数,这个函数属于TCPRelay def handle_event(self, sock, fd, event): # handle events and dispatch to handlers conn = self._server_socket.accept() TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local)

9、 local._remote_sock将_data_to_write_to_remote中缓冲的已加密的数据发送给server._local_sock,并且进入STREAM状态。在server端,server._local_sock接收加密的数据,并解密数据,对数据解析,并停止对server._local_sock读取数据,紧接着server.TCPRelayHandler进入DNS状态

local.TCPRelayHandler._remote_sock的处理逻辑如下 def handle_event(self, sock, event): # order is important if sock == self._remote_sock: if event & eventloop.POLL_OUT: self._on_remote_write() def _on_remote_write(self): # handle remote writable event self._stage = STAGE_STREAM data = b.join(self._data_to_write_to_remote) self._data_to_write_to_remote = [] self._write_to_sock(data, self._remote_sock) server.TCPRelayHandler._local_sock的处理逻辑如下 def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) data = self._encryptor.decrypt(data) if (not is_local and self._stage == STAGE_INIT): self._handle_stage_addr(data) def _handle_stage_addr(self, data): header_result = parse_header(data) addrtype, remote_addr, remote_port, header_length = header_result # pause reading self._update_stream(STREAM_UP, WAIT_STATUS_WRITING) self._stage = STAGE_DNS if len(data) > header_length: self._data_to_write_to_remote.append(data[header_length:]) # notice here may go into _handle_dns_resolved directly self._dns_resolver.resolve(remote_addr, self._handle_dns_resolved)

10、 注意了,这一步发生在server端,异步dns,并停止从TCPRelayHandler._local_sock读取数据

server.DNSResolver._sock的处理逻辑如下 def handle_event(self, sock, event): self._on_local_read() def _on_local_read(self): if (not is_local and self._stage == STAGE_INIT): self._handle_stage_addr(data) def _handle_stage_addr(self, data): self._stage = STAGE_DNS self._dns_resolver.resolve(remote_addr, self._handle_dns_resolved)

11、 TCPRelayHandler通过target server的IP以及第10步中获得的端口向target server发起TCP连接,并将TCPRelayHandler中的_remote_sock加入eventloop中,此时可以读取TCPRelayHandler._local_sock中数据,完成这一步之后server.TCPRelayHandler将进入CONNECTING状态

server.DNSResolver._sock的处理逻辑如下 def _handle_dns_resolved(self, result, error): # else do connect remote_sock = self._create_remote_socket(remote_addr, remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server) self._stage = STAGE_CONNECTING self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)

12、 若target server响应之后,server端将发送数据给target server。这一步使得server.TCPRelayHandler进入STREAM状态

server.TCPRelayHandler._remote_sock的处理逻辑如下def handle_event(self, sock, event): self._on_remote_write() def _on_remote_write(self): # handle remote writable event self._stage = STAGE_STREAM if self._data_to_write_to_remote: data = b.join(self._data_to_write_to_remote) self._data_to_write_to_remote = [] self._write_to_sock(data, self._remote_sock) else: self._update_stream(STREAM_UP, WAIT_STATUS_READING)

13、 通过第9步之后,local的TCPRelayHandler就进入了STREAM状态,在这个状态下,local接收到chrome发送过来的数据,然后进行简单的加密转发。chrome发送出来的数据将由local._local_sock接收

local.TCPRelayHandler._local_sock的处理逻辑如下 def handle_event(self, sock, event): self._on_local_read() def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE)

14、 local.TCPRelayHandler._local_sock将接收到的数据加密,并通过local.TCPRelayHandler._remote_sock转发给server端

local.TCPRelayHandler._local_sock与local.TCPRelayHandler._remote_sock的处理逻辑如下 def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) self._handle_stage_stream(data) def _handle_stage_stream(self, data): data = self._encryptor.encrypt(data) self._write_to_sock(data, self._remote_sock) server.TCPRelayHandler._local_sock的处理逻辑如下 def _on_local_read(self): data = self._local_sock.recv(BUF_SIZE) if not is_local: data = self._encryptor.decrypt(data) self._handle_stage_stream(data) def _handle_stage_stream(self, data): self._write_to_sock(data, self._remote_sock)

15、 这一步发生在server端与target server端。在这个过程中,对第15步中获得的数据解密,并通过_remote_sock将解密之后的数据发送给target server

server.TCPRelayHandler._remote_sock的处理逻辑如下 def _on_local_read(self): if not is_local: data = self._encryptor.decrypt(data) self._handle_stage_stream(data) def _handle_stage_stream(self, data): self._write_to_sock(data, self._remote_sock)

16、 target server将请求的数据返回给server端,由server端的_remote_sock接收

server.TCPRelayHandler._remote_sock的处理逻辑如下 def _on_remote_read(self): data = self._remote_sock.recv(BUF_SIZE)

17、 将target server返回的数据进行加密,通过server中TCPRelayHandler的_local_sock,返回给local中TCPRelayHandler的_remote_sock

local.TCPRelayHandler._remote_sock的处理逻辑如下 def _on_remote_read(self): data = self._remote_sock.recv(BUF_SIZE) server.TCPRelayHandler._local_sock的处理逻辑如下 def _on_remote_read(self): data = self._encryptor.encrypt(data) self._write_to_sock(data, self._local_sock)

18、 local中TCPRelayHandler对server返回的数据进行解密,最终返回给chrome

local.TCPRelayHandler._remote_sock的处理逻辑如下def _on_remote_read(self): data = self._encryptor.decrypt(data) self._write_to_sock(data, self._local_sock)

注意:local与server共同完成了以上18步,它们各自的TCPRelayHandler均有自己的状态,互相独立,但是又有一点联系,因为server.TCPRelay.TCPRelayHandler需要由local.TCPRelay.TCPRelayHandler来驱动(这一步发生在第9步,local.TCPRelay.TCPRelayHandler将进入STREAM状态,同时将数据发送给server,而server.TCPRelay.TCPRelayHandler也将开始自己的状态切换)。为了让TCPRelayHandler区别对待每一个状态,我们需要定义以下几个不同的状态。每个状态均可被local与server共用,只不过server端不需要INIT状态。那么我们来看看这些状态都有哪些?

STAGE_INIT = 0 STAGE_ADDR = 1 STAGE_UDP_ASSOC = 2 STAGE_DNS = 3 STAGE_CONNECTING = 4 STAGE_STREAM = 5 STAGE_DESTROYED = -1

以上就是所有状态了,需要注意的是server不考虑状态INIT(下图红色节点所示),并且直接从ADDR这个状态开始,而local有INIT这个状态,并且从INIT这个状态开始,如下图所示:

ADDR,DNS,CONNECTING,STREAM,这些状态只要处理过程出现错误,就会转移到DESTROY状态。既然要切换状态,那么肯定要有一个驱动状态变化的源头,在这里驱动状态变化的源头就是eventloop。它的执行步骤如下:

eventloop->TCPRelay.handle_event ->TCPRelayHandler(图中的1,执行这一步后进入INIT状态) ->TCPRelayHandler.handle_event ->_handle_stage_init(图中的2,执行这一步后进入ADDR状态,如果是server,那么这一步是不会执行的,直接进入下一步) ->_handle_stage_addr(图中的3,执行这一步之后进入DNS状态) ->_handle_dns_resolved(图中的4,执行这一步之后进入CONNECTING状态) ->_on_remote_write(图中的5,执行这一步之后进入STREAM状态)

以上便是相关的代码。那么问题来了:eventloop是如何获得发生连接的TCPRelay?而取到TCPRelay之后,又是如何获得活跃的TCPRelayHandler?eventloop与TCPRelay有什么样的关系?TCPRelay与TCPRelayHandler又有什么样的关系?带着这4个问题我们来看看下图:

1、 每个TCPRelayHandler都会有2个socket,分别是_local_sock与_remote_sock。每个TCPRelay都会关联多个TCPRelayHandler,这个关联关系通过_fd_to_handlers字典建立联系。其中的fd就是file descriptor的简称,每一个socket都会有唯一的fd。也就是说每个TCPRelayHandler都会被作为值,分2次添加到_fd_to_handlers,而键分别是_local_sock与_remote_sock的fd,伪代码如下:

tcprelay = TCPRelay() #创建10个TCPRelayHandler,一共有10对_local_sock与_remote_sock #每对_local_sock与_remote_sock拥有相同的TCPRelayHandler #每一个socket都有唯一的fd,因此tcprelay._fd_to_handlers的键不会重复 for i in range(10): handler = TCPRelay.Create_TCPRelayHandler() tcprelay._fd_to_handlers[handler._local_sock.fd] = handler tcprelay._fd_to_handlers[handler._remote_sock.fd] = handler #注意_fd_to_handlers是TCPRelay的成员变量

2、 local和server均有一个eventloop存在(有且只有一个),而eventloop里维护了一个字典成员变量self._fdmap = {} # (f, handler),这个变量维护了eventloop与TCPRelay的关系,请看如下伪代码:

#遍历所有TCPRelay的_server_socket,遍历所有TCPRelayHandler的_remote_sock & _local_sock for sock in [TCPRelay1 TCPRelay2 ... ] [TCPRelayHandler1 TCPRelayHandler2 ...]: fd = sock.fd f = sock #根据sock获得对应的TCPRelay实例,这里有2种情况 #1.如果sock == _server_socket,那么handler = TCPRelay #2.如果sock == _remote_sock || sock == _local_sock,那么handler = TCPRelayHandler._server #而self与self._server是指向同一个TCPRelay实例 handler = get_TCPRelay_by_sock(sock) self._fdmap[fd]=(f,handler)

3、 了解eventloop与TCPRelay的关系以及TCPRelay与TCPRelayHandler的关系之后,让我们来了解一下eventloop是如何驱动TCPRelayHandler状态变化的。

######################################################### eventloop启动时,仅仅会监听TCPRelay._server_socket,此时没有任何的TCPRelayHandler。当chrome向local发送连接建立请求时,此时eventloop监听到TCPRelay._server_socket可读,那么eventloop将根据以下伪代码获得TCPRelay._server_socket对应的TCPRelay实例,并调用TCPRelay.handle_event函数将执行的上下文切换到TCPRelay。TCPRelay中会创建一个TCPRelayHandler实例(此时这个实例会进入INIT状态)。把TCPRelayHandler._local_sock添加到eventloop中,为TCPRelayHandler._local_sock与TCPRelay建立映射,使得eventloop可以通过TCPRelayHandler._local_sock获得创建这个TCPRelayHandler的TCPRelay,进而通过TCPRelay._fd_to_handlers获取TCPRelayHandler,从而可以将程序执行的上下文切换到TCPRelayHandler,并调用TCPRelayHandler.handle_event来根据当前状态选择执行TCPRelayHandler所定义的函数,最终使得某个TCPRelayHandler进入最终的STREAM状态。 sock = self._server_socket fd = sock.fd sock_handler = eventloop._fdmap[fd] handler = sock_handler[1] handler.handle_event(sock, fd, event) #以下是对应代码的分析 #每一个TCPRelay负责监听一个端口,而负责监听这个端口的socket是self._server_socket #因此需要定义一个TCPRelay的类 #这个类定义了一个方法叫handle_events,并且会由eventloop中的一个dict变量_fdmap建立_server_socket->handle_events映射 #如果eventloop发现_server_socket可读,说明发送端调用connect函数向_server_socket发送连接请求,那么eventloop就会通过_fdmap获取handle_events #紧接着eventloop会调用handle_events,通过accept接收来自发送端的连接请求,并得到conn连接(注意TCP的3次握手发生在connect与accept之间),进入以下代码的第1步,创建TCPRelayHandler #在创建TCPRelayHandler过程中,self._local_sock记录了发送端的socket(来自conn[0]),此时self._local_sock将用于连接发送端 #注意,得到self._local_sock之后,我们也必须把它加入eventloop中,用_fdmap建立_local_sock->TCPRelay.handle_events映射 #也就是说一个TCPRelay会管理很多个TCPRelayHandler,而每个TCPRelayHandler都会维护一个_local_sock与_remote_sock,eventloop会为_local_sock->TCPRelay.handle_events与_remote_sock->TCPRelay.handle_events建立映射 #那么问题来了,当来自sock != self._server_socket时,我如何知道选择哪一个TCPRelayHandler? #这个时候就要借助TCPRelay里的dict变量_fd_to_handlers,根据sock对应的fd来获取TCPRelayHandler #然后将处理转发给TCPRelayHandler的handle_event #我们来总结一下这个过程:eventloop监听到可读写的socket,然后根据_fdmap取得TCPRelay实例,并调用TCPRelay.handle_event, #如果这个socket是_server_socket那么就在TCPRelay中完成调用,如果socket不是_server_socket,那么就根据通过socket从_fd_to_handlers #取出TCPRelayHandler实例,并调用TCPRelayHandler.handle_event方法 #每次调用TCPRelayHandler.handle_event的方法时会根据不同的情况调用2个函数_on_local_read与_on_remote_write来切换状态 # ######################################################### # define TCPRelay class class TCPRelay(object): def handle_event(self, sock, fd, event): # handle events and dispatch to handlers # eventloop监听到与TCPRelay对应的socket变化了,调用此函数 if sock == self._server_socket: conn = self._server_socket.accept() ######################################################### # 1.创建TCPRelayHandler,让处理进入INIT状态 ######################################################### TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local) else: # handler is TCPRelayHandler instance handler = self._fd_to_handlers.get(fd, None) # 将处理转移给TCPRelayHandler的handle_event函数,2,3,4,5步都在这个函数里完成,接着看下面的代码 handler.handle_event(sock, event) #define TCPRelayHandler class class TCPRelayHandler(object): def handle_event(self, sock, event): # handle all events in this handler and dispatch them to methods # order is important if sock == self._remote_sock: if event & eventloop.POLL_OUT: ######################################################### # 5.在这里切换到STREAM状态 ######################################################### self._on_remote_write() elif sock == self._local_sock: if event & (eventloop.POLL_IN | eventloop.POLL_HUP): self._on_local_read() def _on_local_read(self): # handle all local read events and dispatch them to methods for # each stage is_local = self._is_local data = None if self._stage == STAGE_STREAM: self._handle_stage_stream(data) return elif is_local and self._stage == STAGE_INIT: ######################################################### # 2.socks5协议中的协商认证方式,处理完成以后将切换到ADDR状态 ######################################################### self._handle_stage_init(data) elif (is_local and self._stage == STAGE_ADDR) or \ (not is_local and self._stage == STAGE_INIT): ######################################################### # 3. 完成socks5协议中的建立连接阶段,并且停止读取信息同时切换到DNS状态,紧接着进行域名解析,最终由eventloop监测dns socket变化,调用_handle_dns_resolved切换到CONNECTING状态 ######################################################### self._handle_stage_addr(data) ######################################################### # 4.切换到CONNECTING状态 ######################################################### def _handle_dns_resolved(self, result, error): ip = result[1] self._stage = STAGE_CONNECTING remote_addr = ip if self._is_local: remote_port = self._chosen_server[1] else: remote_port = self._remote_address[1] remote_sock = self._create_remote_socket(remote_addr,remote_port) remote_sock.connect((remote_addr, remote_port)) self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, self._server)

3.2 udp类型的加密中转服务

udp类型的加密中转服务分为2部分:1.TCPRelayHandler处理socks5协议(只有在local端才会处理);2.UDPRelay转发(local端与server端都会处理)。因此我们需要根据这2部分来理解udp类型的加密中转服务。

1、TCPRelayHandler处理socks5协议(只有在local端才会处理) 注意这个阶段的逻辑只会在local端才会执行,server端肯定不会执行。因此请把以下发生的场景切换到local上下文。

socks5的认证协商阶段

def _handle_stage_init(self, data): #+----+----------+----------+ #|VER | NMETHODS | METHODS| #+----+----------+----------+ #| 1|1 | 1 to 255 | #+----+----------+----------+ self._check_auth_method(data) self._write_to_sock(b\x05\00, self._local_sock) self._stage = STAGE_ADDR

socks5的UDP assosiate阶段

def _handle_stage_addr(self, data): #+----+-----+-------+------+----------+----------+ #|VER | CMD |RSV| ATYP | DST.ADDR | DST.PORT | #+----+-----+-------+------+----------+----------+ #| 1|1| 1 |1 | Variable |2 | #+----+-----+-------+------+----------+----------+ #DST.ADDR | DST.PORT都没有被用到 cmd = common.ord(data[1]) if cmd == CMD_UDP_ASSOCIATE: logging.debug(UDP associate) if self._local_sock.family == socket.AF_INET6: header = b\x05\x00\x00\x04 else: header = b\x05\x00\x00\x01 addr, port = self._local_sock.getsockname()[:2] addr_to_send = socket.inet_pton(self._local_sock.family, addr) port_to_send = struct.pack(>H, port) #+----+-----+-------+------+----------+----------+ #|VER | REP |RSV| ATYP | BND.ADDR | BND.PORT | #+----+-----+-------+------+----------+----------+ #| 1|1| 1 |1 | Variable |2 | #+----+-----+-------+------+----------+----------+ #|<--------header--------->|addr_to_send|port_to_send| #注意socks5客户端会建立一个UDP类型的socket并且使用|addr_to_send|port_to_send|来向local发送消息 #同时注意local启动的时候会在port_to_send上建立TCP与UDP的socket #因此socks5客户端拿到|addr_to_send|port_to_send|之后,就可以向local的UDP socket发送数据了 self._write_to_sock(header + addr_to_send + port_to_send, self._local_sock) self._stage = STAGE_UDP_ASSOC # just wait for the client to disconnect return

注意这个阶段将会把TCPRelayHandler切换到STAGE_UDP_ASSOC状态,在这个状态下将不会接收任何TCP连接发来的请求,久而久之LRUCache就会把这个状态下的TCPRelayHandler销毁。

2、UDPRelay转发(local端与server端都会处理)

这一阶段都会发生在local与server端,如下图所示。在这个图中主要分为上行传输与下行传输。在逻辑上,socks5客户端,local.UDPRelay,server.UDPRelay,udp目标服务器都有自己的udpsock,并且它们的udpsock是一一对应传输数据的,只不过实际实现的时候需要借助_server_sock来帮忙转发数据。

有了上图为基础,我们把整个过程分为以下2个阶段,分别是上行传输与下行传输。

上行传输

上行传输是socks5客户端将数据转发给udp目标服务器,如下图所示

传输过程为:sock5客户端.udpsock1将数据发送给local.UDPRelay._server_sock,local.UDPRelay._server_sock获取数据,对数据进行加密,并且由local.UDPRelay.udpsock1将数据转发给server.UDPRelay._server_sock,server.UDPRelay._server_sock获取数据,对数据进行解密,并由server.UDPRelay.udpsock1将数据转发给udp目标服务器.udpsock1。这个过程涉及的代码如下:

def _handle_server(self): server = self._server_socket #1.local端获取socks5发送过来的数据,格式如下: # +----+------+------+----------+----------+----------+ # |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA | # +----+------+------+----------+----------+----------+ #~~~~~~~~~~~~~~~~~~~ #4.server端接收到local端发送过来的数据,格式如下: # +------+----------+----------+----------+ # | ATYP | DST.ADDR | DST.PORT | DATA | # +------+----------+----------+----------+ data, r_addr = server.recvfrom(BUF_SIZE) key = None iv = None if self._is_local: frag = common.ord(data[2]) if frag != 0: logging.warn(UDP drop a message since frag is not 0) return else: #local端将RSV,FRAG剔除,剔除之后数据格式如下 # +------+----------+----------+----------+ # | ATYP | DST.ADDR | DST.PORT | DATA | # +------+----------+----------+----------+ data = data[3:] else: data, key, iv = encrypt.dencrypt_all(self._password, self._method, data) header_result = parse_header(data) addrtype, dest_addr, dest_port, header_length = header_result if self._is_local: server_addr, server_port = self._get_a_server() else: server_addr, server_port = dest_addr, dest_port addrs = self._dns_cache.get(server_addr, None) if addrs is None: addrs = socket.getaddrinfo(server_addr, server_port, 0, socket.SOCK_DGRAM, socket.SOL_UDP) self._dns_cache[server_addr] = addrs af, socktype, proto, canonname, sa = addrs[0] key = client_key(r_addr, af) #2.local端,为socks5客户端所对应的udpsock建立映射关系 #~~~~~~~~~~~~ #5.server端,为local端所对应的udpsock建立映射关系 client = self._cache.get(key, None) if not client: client = socket.socket(af, socktype, proto) client.setblocking(False) self._cache[key] = client self._client_fd_to_server_addr[client.fileno()] = r_addr self._sockets.add(client.fileno()) self._eventloop.add(client, eventloop.POLL_IN, self) if self._is_local: key, iv, m = encrypt.gen_key_iv(self._password, self._method) data = encrypt.encrypt_all_m(key, iv, m, self._method, data) data = data[header_length:] #3.local端,使用udpsock将数据发送到server端 #~~~~~~~~~~~~~~ #6.server端,使用udpsock将数据发送到udp目标服务器 client.sendto(data, (server_addr, server_port))

下行传输

下行传输是udp目标服务器向socks5客户端传输数据。如下图所示:

对应的代码如下:

def _handle_client(self, sock): #1.server端,接收来自udp目标服务器发来的数据 #~~~~~~~~~~~~ #4.local端,接收来自server端发来的数据 data, r_addr = sock.recvfrom(BUF_SIZE) if not self._is_local: data = pack_addr(r_addr[0]) + struct.pack(>H, r_addr[1]) + data response = encrypt.encrypt_all(self._password, self._method, 1, data) else: data = encrypt.encrypt_all(self._password, self._method, 0, data) header_result = parse_header(data) addrtype, dest_addr, dest_port, header_length = header_result response = b\x00\x00\x00 + data #2.server端,获取与local对应的udp地址和端口 #~~~~~~~~~~~~~~ #5.local端,获取与socks5客户端对应的udp地址和端口 client_addr = self._client_fd_to_server_addr.get(sock.fileno()) #3.server端,使用_server_socket将数据发送到local端 #~~~~~~~~~~~~~~ #6.local端,使用_server_socket将数据发送到socks5客户端 self._server_socket.sendto(response, client_addr)

4. 为RequestRelayer添加新功能

4.1 用户场景分析

在为RequestRelayer添加新的功能之前,我们需要分析用户的使用场景有哪些。只有了解了用户的使用场景,我们才能清楚的知道至少需要为RequestRelayer添加哪些功能,而不是盲目的为RequestRelayer添加功能。为了操作多台RequestRelayer服务器,我们需要借助于web应用。原因是我们迫切需要在任何时候,任何设备上都可以以管理员的身份登录到web应用对每一个RequestRelayer服务器进行操作。因此我们要实现自己的一个web应用(也就是我们之前反复提到的SS Controller)来管理所有的RequestRelayer,而我们应该根据用户的使用场景来思考哪些功能是SS Controller应该提供的。以下便是一些用户使用场景,可以供大家参考:

某台运行RequestRelayer服务的机器宕机了,之前为每一个用户开通的服务将无法使用了,过了一段时间这台机器恢复正常了,并重新启动了RequestRelayer服务,作为管理员,我希望能重新恢复该RequestRelayer服务的所有用户。另外一个用户场景是将某台RequestRelayer服务器上已开通的用户服务转移到另外一台RequestRelayer服务器上。因此SS Controller需要提供以下接口,用于向指定服务器,向RequestRelayer服务请求,在指定端口上开通加密转发服务,并且指定密码。 ssInfo=(server_ip,server_port,user_pwd) def addAccountOnSS(ssInfo): 在SS Controller中注册的用户或者管理员能够自助开通RequestRelayer服务,因此我们需要一个接口,指定服务器IP地址以及用户密码 ssInfo=(server_ip,user_pwd) def createAccountOnSS(ssInfo): 在SS Controller中,管理员可以关闭某个用户已开通的RequestRelayer服务 ssInfo=(server_ip,server_port) def remove_account_on_ss(ssInfo): 用户把流量用超了,此时RequestRelayer将暂时停止为该用户服务,直到下一个月的时候,如果用户还在服务期限内,那么将会调用以下接口来让RequestRelayer恢复该用户的服务 ssInfo=(server_ip,server_port) coresponse to ss attach interface def restore_account_on_ss(ssInfo): 返回某个RequestRelayer服务的所有用户 ssInfo=server_ip def get_account_list_on_ss(ssInfo): 返回某个RequestRelayer服务的所有用户的剩余流量 ssInfo=server_ip def auto_retrieve_all_available_transfer_on_ss(ssInfo): 获取某个RequestRelayer上某个用户当前剩余的流量 ssInfo=(server_ip,server_port) def get_current_available_transfer_on_ss(ssInfo): 更新某个RequestRelayer上某个用户的剩余流量 ssInfo=(server_ip,server_port,available_transfer) def update_current_available_transfer_on_ss(ssInfo): 某个RequestRelayer crash了,然后需要重启这个crash的RequestRelayer,重启之后,需要重新为这个crash的RequestRelayer重新恢复crash之前所服务的所有用户 ssInfo=(server_ip,port_password_dict) def auto_restart_on_ss(ssInfo):

4.2 为每个用户场景添加对应的功能

根据以上列出的场景,我们需要相应的为RequestRelayer添加对应的接口来对接以上的应用场景。以下便是RequestRelayer需要添加的新的命令。

1-获取RequestRelayer上正在服务的所有用户2-为用户在RequestRelayer上创建该用户的服务3-恢复RequestRelayer crash之前所服务的所有用户4-获取RequestRelayer上所有用户的剩余流量5-获取RequestRelayer上某个用户的剩余流量6-更新RequestRelayer上某个用户的剩余流量

针对以上几个命令我们需要在manager添加对应的if-else判断

def handle_event(self, sock, fd, event): if sock == self._control_socket and event == eventloop.POLL_IN: encryptedData, self._control_client_addr = sock.recvfrom(BUF_SIZE) data = encrypt.encrypt_all(self._config[manager_password], self._config[manager_method], 0, encryptedData) parsed = self._parse_command(data) if parsed: command, config = parsed a_config = self._config.copy() if config: # let the command override the configuration file a_config.update(config) if server_port not in a_config: logging.error(can not find server_port in config) else: if command == add: self.add_port(a_config) self._send_control_data(bok) elif command == remove: self.remove_port(a_config) self._send_control_data(bok) elif command == attach: self.attach_port(a_config) self._send_control_data(bok) elif command == ping: self._send_control_data(bpong) elif command == 1: logging.info("1 command") port_list = {} for k,v in self._relays.items(): port_list[k] = v[2] self.send_data(port_list) elif command == 2: logging.info("2 command") a_config[server_port]=dispatch_port_mgr.dispatch_port() self.add_port(a_config) self._send_control_data(str(a_config[server_port])) elif command == 3: logging.info("3 command") ports_arr = a_config[valid_portpwds] del a_config[valid_portpwds] for port, password_transfer in ports_arr.items(): my_config = a_config.copy() my_config[server_port] = int(port) my_config[password] = password_transfer[0] self._available_transfer[my_config[server_port]]=long(password_transfer[1]) self.add_port(my_config) self._send_control_data(bok) elif command == 4: logging.info("4 command") r={} for k, v in self._available_transfer.items(): r[k] = v self.send_data(r) elif command == 5: logging.info("5 transfer") self._send_control_data(str(self._available_transfer[a_config[server_port]])) elif command == 6: logging.info("6 command") self._available_transfer[a_config[server_port]] = a_config[available_transfer] self._send_control_data(bok) else: logging.error(unknown command %s, command)