很多学习和使用网络编程的人都有这样的体会:同步接口写起来简单但是并发低所以适用场景少、异步接口虽然性能高但是代码编写和维护都很麻烦。
协程就是解决这个问题的,把异步编程的难度降低到和同步相当,当然代码的可读性和可维护性也是和同步相当。
现在几乎所有的高级编程语言都有协程了。但是从性能上来说C++依然是一骑绝尘,上篇文章已经做过性能测试了。rust的我没测过但是我觉得应该和C++有一拼。
本文我将通过一个简单又实用的例子,介绍协程异步网络编程的基础知识,并且用C++和rust分别实现一遍,就算是完全不懂rust的人,跟着本文的例子,也能学会到很多rust的知识,依葫芦画瓢以后基本的网络应用都可以写,至于C++知识就更不用说了,我会一如既往地重点介绍。
文章内容:
异步网络编程的难点和重点aiso、tokio常用的功能介绍socks5代理的原理简介socks5的C++实现socks5的rust实现socks5的测试收发包数据说明目录
异步网络编程的难点和重点
异步编程需要很多回调函数,当回调多了以后代码的编写和维护会变得很麻烦,这种现象叫做回调地狱(callback hell)。
除了臭名昭著的回调问题,在收发数据时不能一次收发指定长度的数据。
比如说我想在某个socket收取200字节的数据,调用一次read/recv可能只收到几十个字节,然后需要将这几十个字节存起来,下次继续调用read,直到收到的数据大于等于200字节。大于200字节时,又要将多余的数据存起来。
写数据也一样,向某个socket里面写入一定字节的数据时,也可能需要多次调用send/write,每次记录下次需要发送数据的起始位置。
这些都是很麻烦的操作。一个好的网络库必须至少解决这样的问题:具备一次调用就能收取和发送指定长度数据的能力。
c++的asio和rust的tokio都是满足这样条件程序库。解决了收发的主要难题,程序就会简单许多。
异步编程框架还必须提供超时的功能,这个我打算在以后其它的文章里面单独介绍。
asio/tokio常用功能介绍
开启新的协程//asio:asio::co_spawn//tokiotokio::spawn以协程的方式在某个socket上面调用一次数据读取,读到的数据小于等于指定的buffer长度//asio:size_t n = co_await socket.async_read_some(asio::buffer(buff), use_awaitable);//tokio:let n = local_socket.read(&mut buff).await?;以协程方式在某个sokcet读取指定长度的数据(数据长度以buffer的长度来标识),asio和tokio可能会读取多次,但是调用者只需要调用一次,当buffer读满、或是socket关闭时返回。//asiosize_t n = co_await asio::async_read(socket, asio::buffer(buff), use_awaitable);//tokiolet n = socket.read_all(&mut buff).await?;以协程的方式向某个socket发送指定长度的数据,asio和tokio的实现可能会进行多次发送,但是调用者只需要调用一次。当指定的数据全部发送完成或是socket关闭时返回。// asio:co_await asio::async_write(socket, asio::buffer(buff), use_awaitable);//tokiosocket.write_all(&buff[..n]).await?;最常用的功能就这几个。asio看起来比tokio繁琐一些,因为asio支持多种异步接口,本文所使用的C++20的stackless coroutine仅仅只是其中一种。asio支持5种以上的异步功能,仅协程就有3种之多。而且它允许用户自己扩展新的异步功能,比如说你可以将use_awaitable改成你自己的use_xxx,然后就可以扩展一种。
asio提供的接口风格中可以将use_awaitable并设置为默认方式,有兴趣的可以自己参见asio的文档。去掉以后简程度和rust的tokio可以有一拼了。
socks5代理的原理简介
socks5代理是一种非常简单的代理,是学习网络编程的好素材,上面介绍的功能都会用到。而且因为socket代码需要在客户端和目标服务器之间转发数据,所以它每个「连接」都需要两个socket,从这点来说比普通的网络程序都要复杂。只要将它实现一遍,其它更简单的网络程序更是不在话下了。
socks5的逻辑分成两部分,握手阶段和透传阶段。
握手阶段:客户端连接socks5代理服务器,确定加密方式,并且把远程的目标服务器地址告诉socks5代理,socks5代理与远程的目标服务器建立连接。
透传阶段:连接建好以后,socks5代理服务器就直接充当一个传话筒的角色。客户端发送给它什么,他就原封不动地发送给远程的目标服务器;反之,从目标服务器收到的任何数据,socks5代理服务器也会原封不动地发送给客户端。从效果上来说,好像socks5代理不存在、而是客户端直接和远程的目标服务器通信一样。
socks5的协议非常之简单,在rfc1928中描述,照着这个协议就可以直接写出代码。
根据上面介绍的两个阶段,原理图如下:
socks5的C++实现
代码比较直观,为了文章的简单起见,我把合理性判断的代码全部都删掉了,想要看全部代码的直接去github上面看。
我在代码中用了format库,format库的主要功能已经被纳入C++20中,但是目前只有最新版本的visual studio支持。所以我还是采用第三方库。因为我用cmake可以自动下载,其实使用第三方库的方便程度比rust差不了多少。
CMakeLists.txt内容如下:cmake_minimum_required(VERSION 3.15)project(socks5)set(CMAKE_CXX_STANDARD 20)include(FetchContent)FetchContent_Declare(asioGIT_REPOSITORYGIT_TAG masterCONFIGURE_COMMAND ""BUILD_COMMAND "")FetchContent_Populate(asio)include_directories(${asio_SOURCE_DIR}/asio/include/)FetchContent_Declare(fmtGIT_REPOSITORY GIT_TAG master)FetchContent_MakeAvailable(fmt)include_directories(${fmt_SOURCE_DIR}/include/)add_executable(socks5 socks5.cpp)target_link_libraries(socks5 PRIVATE fmt::fmt)socks5.cpp的main函数如下:int main(){ try { asio::io_context io_context(1); // 想使用单线程这样传参 asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { io_context.stop(); }); // listener是监听并接收新连接的协程 co_spawn(io_context, listener(), detached); io_context.run(); } catch (std::exception &e) { std::printf("Exception: %s\n", e.what()); }}listener的实现如下:awaitable<void> listener(){ auto executor = co_await this_coro::executor; tcp::acceptor acceptor(executor, {tcp::v4(), 10001}); // 绑定端口 for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); fmt::print("new conn, fd={}\n", socket.native_handle()); // socks5是握手的协程 co_spawn(executor, socks5(std::move(socket)), detached); }}socks5函数也是相对最麻烦的一个,但是如果去掉不重要的功能,实现如下:awaitable<void> socks5(tcp::socket local_socket){ try { uint8_t data[1024]; // 接收05 01 00 std::size_t n = co_await local_socket.async_read_some(buffer(data), use_awaitable); // 回复05 00 co_await asio::async_write(local_socket, buffer(std::array<uint8_t, 2>{0x05, 0x00}), use_awaitable); // 接收05 01 00 01/03 ...,获取客户端需要连接的目标地址 n = co_await local_socket.async_read_some(buffer(data), use_awaitable); if (data[1] != 0x01) { // 只实现了CMD=1,即connect fmt::print("only supported cmd=1, requested cmd={} is not supported\n", data[1]); data[1] = 0x07; // Command not supported co_await asio::async_write(local_socket, buffer(data, n), use_awaitable); co_return; } std::optional<std::string> addr; // 解析出来的地址保存在这里 uint16_t port = 0; switch (data[3]) { // ATYP case 0x01: // ipv4格式的地址: 05 01 00 01 ip[4] port[2] { addr = fmt::format("{}.{}.{}.{}", data[4], data[5], data[6], data[7]); port = uint16_t(data[8]) * 256 + data[9]; } break; case 0x03: // 域名: 05 01 00 03 host_len host[host_len] port[2] { const size_t domain_len = data[4]; addr = std::string((char *)(&data[0]) + 5, domain_len); port = uint16_t(data[n - 2]) * 256 + data[n - 1]; } break; default: // 其它的type暂时不支持 data[1] = 0x08; // X08 Address type not supported co_await asio::async_write(local_socket, buffer(data, n), use_awaitable); co_return; break; } // 解析域名 asio::ip::tcp::resolver resolver(local_socket.get_executor()); auto results = co_await resolver.async_resolve(addr.value(), std::to_string(port), use_awaitable); // 连接解析出来的域名 tcp::socket remote_socket(local_socket.get_executor()); co_await asio::async_connect(remote_socket, results, use_awaitable); // 连接成功,回包给client uint8_t resp[] = {0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; co_await asio::async_write(local_socket, buffer(resp), use_awaitable); // 握手完成,开始透传数据,copy是透传数据的协程 co_await (copy(local_socket, remote_socket) || copy(remote_socket, local_socket)); } catch (asio::system_error& e) { fmt::print("socks5 Exception: {}, {}\n", e.what(), e.code().value()); }}透传数据的协程copy函数实现如下:awaitable<void> copy(tcp::socket &from, tcp::socket &to){ uint8_t data[1024]; try { for (;;) // 从from收取数据,发送到to { size_t n = co_await from.async_read_some(buffer(data), use_awaitable); co_await asio::async_write(to, buffer(data, n), use_awaitable); } } catch (std::exception &e) { from.close(); to.close(); }}整个逻辑和看起来和同步的习惯几乎一样。这个例子并没有实现全部的socks5,而是实现了一个最小化可以完整运行的socks5,完整功能其实无非也就是照着协议文本多加几个if...else并没有难度,原理都差不多。
使用cmake编译这样:mkdir -p build; cd build; cmake ..; make,然后就得到了可执行文件socks5,直接运行即开启了socks5服务器。
socks5的rust实现
cargo new --bin rsocks5建立一个cargo项目,Cargo.toml加上这些:
[dependencies]tokio = { version = "1", features = ["full"] }byteorder = "1"log = "0.4.14"[dev-dependencies]env_logger = "0.8.3"我几个月以前用rust写过一遍比这个更复杂的功能,今天为了写这篇文章我又照着前面的c++的功能重新写了一遍rust部分,每个函数和上面的cpp代码一一对应,连名字都一样,我不一个个地介绍了。为了简短起见我把判断的代码也删掉了。main.rs内容如下:
use byteorder::{BigEndian, ByteOrder};use std::io::Result;use std::net::SocketAddr;use std::net::{IpAddr, Ipv4Addr};use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};use tokio::net::{TcpListener, TcpStream};async fn copy<T: AsyncRead + Unpin, U: AsyncWrite + Unpin>(mut sock1: T, mut sock2: U) -> Result<()> { let mut buff = [0; 1024]; loop { let n = sock1.read(&mut buff[..]).await?; if n == 0 { break; } sock2.write_all(&buff[..n]).await?; } return Ok(());}async fn socks5(mut local_socket: TcpStream) -> Result<()> { let mut buff = [0; 1024]; // 接收05 01 00 let n = local_socket.read(&mut buff).await?; // 回复05 00 local_socket.write_all(b"\x05\x00").await?; // 接收05 01 00 01/03 addr port let n = local_socket.read(&mut buff).await?; if buff[1] != 1 { // 只实现了cmd=1,即connect println!("cmd={} is not supported", buff[1]); buff[1] = b\x07; // command not supported local_socket.write_all(&mut buff[..n]).await?; return Ok(()); } let addr; // 解析出来的地址保存在这里 match buff[3] { 1 => { // ipv4格式的地址: 05 01 00 01 ip[4] port[2] let dest_addr = IpAddr::V4(Ipv4Addr::new(buff[4], buff[5], buff[6], buff[7])); let dest_port =BigEndian::read_u16(&buff[8..]); addr = SocketAddr::new(dest_addr, dest_port).to_string(); } 3 => { // 域名: 05 01 00 03 host_len host[host_len] port[2] let offset = 4 + 1 + (buff[4] as usize); let dest_port = BigEndian::read_u16(&buff[offset..]); let mut dest_addr = std::str::from_utf8(&buff[5..offset]).unwrap().to_string(); dest_addr.push_str(":"); dest_addr.push_str(&dest_port.to_string()); println!("dest addr: {:?}", &dest_addr); addr = dest_addr; } _ => { // 其它的type暂时不支持,有兴趣自己加 println!("address type={} not supported", buff[3]); let bytes = [b\x05, b\x08, b\x00, b\x01, b\x00, b\x00, b\x00, b\x00, b\x00, b\x00]; local_socket.write_all(&bytes[..]).await?; return Ok(()); } } let remote_socket = TcpStream::connect(addr).await?; let bytes = [b\x05, b\x00, b\x00, b\x01, b\x00, b\x00, b\x00, b\x00, b\x00, b\x00]; local_socket.write_all(&bytes[..]).await?; let (ri, wi) = local_socket.into_split(); let (ro, wo) = remote_socket.into_split(); tokio::spawn(async move { copy(ro, wi).await.unwrap(); }); tokio::spawn(async move { copy(ri, wo).await.unwrap(); }); return Ok(());}async fn listener() -> Result<()> { let addr = "127.0.0.1:10001".to_string().parse::<SocketAddr>().unwrap(); let tl: TcpListener = TcpListener::bind(&addr).await?; loop { let (socket, _) = tl.accept().await?; tokio::spawn(async move { socks5(socket).await.unwrap(); }); }}#[tokio::main]async fn main() { //tokio::spawn(async {listener().await.unwrap();}); listener().await.unwrap();}cargo build得到可执行文件rsocks5。或者用cargo run运行起来就行了。
socks5的测试
上面的socks5 server无论是rust的还是c++的,功能都一样,跑起来也简单,但是怎么测试呢?很简单,将浏览器的代理服务器设置成上面代码绑定的socks5://127.0.0.1:10001,如果能够毫无影响地上网,表示socks5代码功能OK。
firefox可以直接指定代码服务器,但是chrome貌似不可以,需要装插件。怎么设定我这里就不说了哈。
完整代码我依然放在github/franktea/treehouse仓库与文章名同名的目录中。
收发包数据说明
在上面的代码中,从客户端收取数据是我只调用了一次local_socket.async_read_some,就是这代码:
// 接收05 01 00 std::size_t n = co_await local_socket.async_read_some(buffer(data), use_awaitable);这个用法其实不严谨,因为async_read_some并不能保证一次性将客户端发送的数据收完。但是这个地方因为客户端一次性发送的数据只有很少的几个字节,所以从概率上来说一次性不能收完的可能性较小,从一个教学级别的demo来说是ok的。后面还有判断,如果一次性没收正确就将连接断掉,客户端会重连。
但是从生产级别的代码说,这样并不严谨。严谨的写法应该是先收取2个字节(前面说了,收取指定长度的数据用asio::async_read或者tokio的socket.read_all),因为第二个字节指定了后面的长度n,然后再根据这个n调用一次asio::async_read。
第二次从客户端收取域名端口的代码也一样:
// 接收05 01 00 01/03 ... n = co_await local_socket.async_read_some(buffer(data), use_awaitable);如果要严谨需要写得稍微繁琐一些,道理和上面的一样,只要自己心知肚明即可。