协程时代的网络编程(c++asio&&rusttokio)

很多学习和使用网络编程的人都有这样的体会:同步接口写起来简单但是并发低所以适用场景少、异步接口虽然性能高但是代码编写和维护都很麻烦。

协程就是解决这个问题的,把异步编程的难度降低到和同步相当,当然代码的可读性和可维护性也是和同步相当。

现在几乎所有的高级编程语言都有协程了。但是从性能上来说C++依然是一骑绝尘,上篇文章已经做过性能测试了。rust的我没测过但是我觉得应该和C++有一拼。

本文我将通过一个简单又实用的例子,介绍协程异步网络编程的基础知识,并且用C++和rust分别实现一遍,就算是完全不懂rust的人,跟着本文的例子,也能学会到很多rust的知识,依葫芦画瓢以后基本的网络应用都可以写,至于C++知识就更不用说了,我会一如既往地重点介绍。

文章内容:

异步网络编程的难点和重点aiso、tokio常用的功能介绍socks5代理的原理简介socks5的C++实现socks5的rust实现socks5的测试收发包数据说明

异步网络编程的难点和重点

异步编程需要很多回调函数,当回调多了以后代码的编写和维护会变得很麻烦,这种现象叫做回调地狱(callback hell)。

除了臭名昭著的回调问题,在收发数据时不能一次收发指定长度的数据。

比如说我想在某个socket收取200字节的数据,调用一次read/recv可能只收到几十个字节,然后需要将这几十个字节存起来,下次继续调用read,直到收到的数据大于等于200字节。大于200字节时,又要将多余的数据存起来。

写数据也一样,向某个socket里面写入一定字节的数据时,也可能需要多次调用send/write,每次记录下次需要发送数据的起始位置。

这些都是很麻烦的操作。一个好的网络库必须至少解决这样的问题:具备一次调用就能收取和发送指定长度数据的能力。

c++的asio和rust的tokio都是满足这样条件程序库。解决了收发的主要难题,程序就会简单许多。

异步编程框架还必须提供超时的功能,这个我打算在以后其它的文章里面单独介绍。

asio/tokio常用功能介绍

开启新的协程//asio:asio::co_spawn//tokiotokio::spawn以协程的方式在某个socket上面调用一次数据读取,读到的数据小于等于指定的buffer长度//asio:size_t n = co_await socket.async_read_some(asio::buffer(buff), use_awaitable);//tokio:let n = local_socket.read(&mut buff).await?;以协程方式在某个sokcet读取指定长度的数据(数据长度以buffer的长度来标识),asio和tokio可能会读取多次,但是调用者只需要调用一次,当buffer读满、或是socket关闭时返回。//asiosize_t n = co_await asio::async_read(socket, asio::buffer(buff), use_awaitable);//tokiolet n = socket.read_all(&mut buff).await?;以协程的方式向某个socket发送指定长度的数据,asio和tokio的实现可能会进行多次发送,但是调用者只需要调用一次。当指定的数据全部发送完成或是socket关闭时返回。// asio:co_await asio::async_write(socket, asio::buffer(buff), use_awaitable);//tokiosocket.write_all(&buff[..n]).await?;

最常用的功能就这几个。asio看起来比tokio繁琐一些,因为asio支持多种异步接口,本文所使用的C++20的stackless coroutine仅仅只是其中一种。asio支持5种以上的异步功能,仅协程就有3种之多。而且它允许用户自己扩展新的异步功能,比如说你可以将use_awaitable改成你自己的use_xxx,然后就可以扩展一种。

asio提供的接口风格中可以将use_awaitable并设置为默认方式,有兴趣的可以自己参见asio的文档。去掉以后简程度和rust的tokio可以有一拼了。

socks5代理的原理简介

socks5代理是一种非常简单的代理,是学习网络编程的好素材,上面介绍的功能都会用到。而且因为socket代码需要在客户端和目标服务器之间转发数据,所以它每个「连接」都需要两个socket,从这点来说比普通的网络程序都要复杂。只要将它实现一遍,其它更简单的网络程序更是不在话下了。

socks5的逻辑分成两部分,握手阶段和透传阶段。

握手阶段:客户端连接socks5代理服务器,确定加密方式,并且把远程的目标服务器地址告诉socks5代理,socks5代理与远程的目标服务器建立连接。

透传阶段:连接建好以后,socks5代理服务器就直接充当一个传话筒的角色。客户端发送给它什么,他就原封不动地发送给远程的目标服务器;反之,从目标服务器收到的任何数据,socks5代理服务器也会原封不动地发送给客户端。从效果上来说,好像socks5代理不存在、而是客户端直接和远程的目标服务器通信一样。

socks5的协议非常之简单,在rfc1928中描述,照着这个协议就可以直接写出代码。

根据上面介绍的两个阶段,原理图如下:

socks5的C++实现

代码比较直观,为了文章的简单起见,我把合理性判断的代码全部都删掉了,想要看全部代码的直接去github上面看。

我在代码中用了format库,format库的主要功能已经被纳入C++20中,但是目前只有最新版本的visual studio支持。所以我还是采用第三方库。因为我用cmake可以自动下载,其实使用第三方库的方便程度比rust差不了多少。

CMakeLists.txt内容如下:cmake_minimum_required(VERSION 3.15)project(socks5)set(CMAKE_CXX_STANDARD 20)include(FetchContent)FetchContent_Declare(asioGIT_REPOSITORYGIT_TAG masterCONFIGURE_COMMAND ""BUILD_COMMAND "")FetchContent_Populate(asio)include_directories(${asio_SOURCE_DIR}/asio/include/)FetchContent_Declare(fmtGIT_REPOSITORY GIT_TAG master)FetchContent_MakeAvailable(fmt)include_directories(${fmt_SOURCE_DIR}/include/)add_executable(socks5 socks5.cpp)target_link_libraries(socks5 PRIVATE fmt::fmt)socks5.cpp的main函数如下:int main(){    try    {        asio::io_context io_context(1); // 想使用单线程这样传参        asio::signal_set signals(io_context, SIGINT, SIGTERM);        signals.async_wait([&](auto, auto)                           { io_context.stop(); });        // listener是监听并接收新连接的协程        co_spawn(io_context, listener(), detached);        io_context.run();    }    catch (std::exception &e)    {        std::printf("Exception: %s\n", e.what());    }}listener的实现如下:awaitable<void> listener(){    auto executor = co_await this_coro::executor;    tcp::acceptor acceptor(executor, {tcp::v4(), 10001}); // 绑定端口    for (;;)    {        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);        fmt::print("new conn, fd={}\n", socket.native_handle());        // socks5是握手的协程        co_spawn(executor, socks5(std::move(socket)), detached);    }}socks5函数也是相对最麻烦的一个,但是如果去掉不重要的功能,实现如下:awaitable<void> socks5(tcp::socket local_socket){    try    {        uint8_t data[1024];        // 接收05 01 00        std::size_t n = co_await local_socket.async_read_some(buffer(data), use_awaitable);        // 回复05 00        co_await asio::async_write(local_socket, buffer(std::array<uint8_t, 2>{0x05, 0x00}), use_awaitable);        // 接收05 01 00 01/03 ...,获取客户端需要连接的目标地址        n = co_await local_socket.async_read_some(buffer(data), use_awaitable);        if (data[1] != 0x01)        { // 只实现了CMD=1,即connect            fmt::print("only supported cmd=1, requested cmd={} is not supported\n", data[1]);            data[1] = 0x07; // Command not supported            co_await asio::async_write(local_socket, buffer(data, n), use_awaitable);            co_return;        }        std::optional<std::string> addr; // 解析出来的地址保存在这里        uint16_t port = 0;        switch (data[3])        {          // ATYP        case 0x01: // ipv4格式的地址: 05 01 00 01 ip[4] port[2]        {            addr = fmt::format("{}.{}.{}.{}", data[4], data[5], data[6], data[7]);            port = uint16_t(data[8]) * 256 + data[9];        }        break;        case 0x03: // 域名: 05 01 00 03 host_len host[host_len] port[2]        {            const size_t domain_len = data[4];            addr = std::string((char *)(&data[0]) + 5, domain_len);            port = uint16_t(data[n - 2]) * 256 + data[n - 1];        }        break;        default: // 其它的type暂时不支持            data[1] = 0x08; // X08 Address type not supported            co_await asio::async_write(local_socket, buffer(data, n), use_awaitable);            co_return;            break;        }        // 解析域名        asio::ip::tcp::resolver resolver(local_socket.get_executor());        auto results = co_await resolver.async_resolve(addr.value(), std::to_string(port), use_awaitable);        // 连接解析出来的域名        tcp::socket remote_socket(local_socket.get_executor());        co_await asio::async_connect(remote_socket, results, use_awaitable);        // 连接成功,回包给client        uint8_t resp[] = {0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};        co_await asio::async_write(local_socket, buffer(resp), use_awaitable);        // 握手完成,开始透传数据,copy是透传数据的协程        co_await (copy(local_socket, remote_socket) || copy(remote_socket, local_socket));    }    catch (asio::system_error& e)    {        fmt::print("socks5 Exception: {}, {}\n", e.what(), e.code().value());    }}透传数据的协程copy函数实现如下:awaitable<void> copy(tcp::socket &from, tcp::socket &to){    uint8_t data[1024];    try    {        for (;;) // 从from收取数据,发送到to        {            size_t n = co_await from.async_read_some(buffer(data), use_awaitable);            co_await asio::async_write(to, buffer(data, n), use_awaitable);        }    }    catch (std::exception &e)    {        from.close();        to.close();    }}

整个逻辑和看起来和同步的习惯几乎一样。这个例子并没有实现全部的socks5,而是实现了一个最小化可以完整运行的socks5,完整功能其实无非也就是照着协议文本多加几个if...else并没有难度,原理都差不多。

使用cmake编译这样:mkdir -p build; cd build; cmake ..; make,然后就得到了可执行文件socks5,直接运行即开启了socks5服务器。

socks5的rust实现

cargo new --bin rsocks5建立一个cargo项目,Cargo.toml加上这些:

[dependencies]tokio = { version = "1", features = ["full"] }byteorder = "1"log = "0.4.14"[dev-dependencies]env_logger = "0.8.3"

我几个月以前用rust写过一遍比这个更复杂的功能,今天为了写这篇文章我又照着前面的c++的功能重新写了一遍rust部分,每个函数和上面的cpp代码一一对应,连名字都一样,我不一个个地介绍了。为了简短起见我把判断的代码也删掉了。main.rs内容如下:

use byteorder::{BigEndian, ByteOrder};use std::io::Result;use std::net::SocketAddr;use std::net::{IpAddr, Ipv4Addr};use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};use tokio::net::{TcpListener, TcpStream};async fn copy<T: AsyncRead + Unpin, U: AsyncWrite + Unpin>(mut sock1: T, mut sock2: U) -> Result<()> {    let mut buff = [0; 1024];    loop {        let n = sock1.read(&mut buff[..]).await?;        if n == 0 {            break;        }        sock2.write_all(&buff[..n]).await?;    }    return Ok(());}async fn socks5(mut local_socket: TcpStream) -> Result<()> {    let mut buff = [0; 1024];    // 接收05 01 00    let n = local_socket.read(&mut buff).await?;    // 回复05 00    local_socket.write_all(b"\x05\x00").await?;    // 接收05 01 00 01/03 addr port    let n = local_socket.read(&mut buff).await?;    if buff[1] != 1 { // 只实现了cmd=1,即connect        println!("cmd={} is not supported", buff[1]);        buff[1] = b\x07; // command not supported        local_socket.write_all(&mut buff[..n]).await?;        return Ok(());    }    let addr; // 解析出来的地址保存在这里    match buff[3] {        1 => { // ipv4格式的地址: 05 01 00 01 ip[4] port[2]            let dest_addr = IpAddr::V4(Ipv4Addr::new(buff[4], buff[5], buff[6], buff[7]));            let dest_port =BigEndian::read_u16(&buff[8..]);            addr = SocketAddr::new(dest_addr, dest_port).to_string();        }        3 => { // 域名: 05 01 00 03 host_len host[host_len] port[2]            let offset = 4 + 1 + (buff[4] as usize);            let dest_port = BigEndian::read_u16(&buff[offset..]);            let mut dest_addr = std::str::from_utf8(&buff[5..offset]).unwrap().to_string();            dest_addr.push_str(":");            dest_addr.push_str(&dest_port.to_string());            println!("dest addr: {:?}", &dest_addr);            addr = dest_addr;        }        _ => { // 其它的type暂时不支持,有兴趣自己加            println!("address type={} not supported", buff[3]);            let bytes = [b\x05, b\x08, b\x00, b\x01, b\x00, b\x00, b\x00, b\x00, b\x00, b\x00];            local_socket.write_all(&bytes[..]).await?;            return Ok(());        }    }    let remote_socket = TcpStream::connect(addr).await?;    let bytes = [b\x05, b\x00, b\x00, b\x01, b\x00, b\x00, b\x00, b\x00, b\x00, b\x00];    local_socket.write_all(&bytes[..]).await?;    let (ri, wi) = local_socket.into_split();    let (ro, wo) = remote_socket.into_split();    tokio::spawn(async move {        copy(ro, wi).await.unwrap();    });    tokio::spawn(async move {        copy(ri, wo).await.unwrap();    });    return Ok(());}async fn listener() -> Result<()> {    let addr = "127.0.0.1:10001".to_string().parse::<SocketAddr>().unwrap();    let tl: TcpListener = TcpListener::bind(&addr).await?;    loop {        let (socket, _) = tl.accept().await?;        tokio::spawn(async move {            socks5(socket).await.unwrap();        });    }}#[tokio::main]async fn main() {    //tokio::spawn(async {listener().await.unwrap();});    listener().await.unwrap();}

cargo build得到可执行文件rsocks5。或者用cargo run运行起来就行了。

socks5的测试

上面的socks5 server无论是rust的还是c++的,功能都一样,跑起来也简单,但是怎么测试呢?很简单,将浏览器的代理服务器设置成上面代码绑定的socks5://127.0.0.1:10001,如果能够毫无影响地上网,表示socks5代码功能OK。

firefox可以直接指定代码服务器,但是chrome貌似不可以,需要装插件。怎么设定我这里就不说了哈。

完整代码我依然放在github/franktea/treehouse仓库与文章名同名的目录中。

收发包数据说明

在上面的代码中,从客户端收取数据是我只调用了一次local_socket.async_read_some,就是这代码:

        // 接收05 01 00        std::size_t n = co_await local_socket.async_read_some(buffer(data), use_awaitable);

这个用法其实不严谨,因为async_read_some并不能保证一次性将客户端发送的数据收完。但是这个地方因为客户端一次性发送的数据只有很少的几个字节,所以从概率上来说一次性不能收完的可能性较小,从一个教学级别的demo来说是ok的。后面还有判断,如果一次性没收正确就将连接断掉,客户端会重连。

但是从生产级别的代码说,这样并不严谨。严谨的写法应该是先收取2个字节(前面说了,收取指定长度的数据用asio::async_read或者tokio的socket.read_all),因为第二个字节指定了后面的长度n,然后再根据这个n调用一次asio::async_read。

第二次从客户端收取域名端口的代码也一样:

        // 接收05 01 00 01/03 ...        n = co_await local_socket.async_read_some(buffer(data), use_awaitable);

如果要严谨需要写得稍微繁琐一些,道理和上面的一样,只要自己心知肚明即可。