Golang从零到一开发实现RPC框架

内容提要

RPC 框架是分布式领域核心组件,也是微服务的基础。今天尝试从零撸一个 RPC 框架,剖析其核心原理及代码实现,后续还会逐步迭代追加微服务治理等功能,将之前文章覆盖的熔断、限流、负载均衡、注册发现等功能融合进来,打造一个五脏俱全的 RPC 框架。本文主要内容包括:

RPC 实现原理

RPC 协议设计

RPC 服务端实现

RPC 客户端实现

实现原理

RPC (Remote Procedure Call)全称是远程过程调用,相对于本地方法调用,在同一内存空间可以直接通过方法栈实现调用,远程调用则跨了不同的服务终端,并不能直接调用。

RPC框架 要解决的就是远程方法调用的问题,并且实现调用远程服务像调用本地服务一样简单,框架内部封装实现了网络调用的细节(透明化远程调用),其核心过程原理如下图所示。

这个版本可以称为 “P2P RPC” ,而生产环境部署往往会将服务提供者(Server)部署多个实例(集群部署),那么客户端就需要具备发现服务端的能力和负载均衡的支持,所以有了服务注册发现和负载均衡。

再然后,为了保障 RPC 调用的可靠性和稳定性,增加了服务监控和服务容错治理的能力,考虑性能提升的异步化能力以及考虑可扩展性的插件化管理,这些完善构成了更完整的微服务 RPC 框架。

RPC 协议实现

协议设计

协议设计算是 RPC 最重要的一部分了,它主要解决服务端与客户端通信的问题。一般来说通讯要解决如下问题:

1. 网络传输协议基于 TCP、UDP 还是 HTTP,UDP 要自己解决可靠性传输问题,而 HTTP 又太重,包含很多没必要的头信息,所以一般 RPC 框架会优先选择 TCP 协议。

(当然也有大名鼎鼎的 gRPC 基于 HTTP2)

2. 序列化协议

网络传输数据必须是二进制数据,而执行过程是编程语言的对象方法,那么就涉及到如何将对象序列化成可传输消息(二进制),并可反序列化还原。常见的通用型协议如 XML、 JSON、Protobuf、Thrift 等,也有语言绑定的如 Python 原生支持的 pickle 协议, Java 实现的 Serializbale 接口及 Hessian 协议,Golang 原生支持的 Gob 协议等。

3. 消息编码协议

它是一种客户端和服务端的调用约定,比如请求和参数如何组织,Header 放置什么内容。这部分每个框架设计均不同,有时也称这一层为狭义的 RPC 协议层。

另外客户端发起调用一般来说要知道调用的具体类方法(请求标识符)以及入参(Payload),而网络传输的是二进制字节流,如何能从这些字节中找出哪些是方法名,哪些是参数?进一步如果客户端不断的发送消息,如何将每一条消息分割?(解决 TCP 粘包问题)

采用定长消息很容易解决,但事先并不能确定要固定多长,所以这种方式并不可行。消息加分隔符可以实现,但要确保分隔符不会与正文冲突。而最常用的实现方案就是用定长的头标识出不定长的体,比如用 int32 (定长 4 字节)标识后面的内容长度,这样就能较优雅实现消息分割了。

(注:这个方案中如果消息体的长度大于 2^32 会发生溢出而导致解析失败,可以换更长类型,但理论上总会有溢出风险,设计使用时应该限制避免传输过大数据体)

协议实现

网络传输协议,这里使用 TCP 协议即可,没有太多争议,可预留接口支持。

序列化协议,这里使用 Golang 专有的 Gob 协议,保留接口后期可以扩展支持 JSON、Protobuf 等协议。

type Codec interface {    Encode(i interface{}) ([]byte, error)    Decode(data []byte, i interface{}) error}type GobCodec struct{}func (c GobCodec) Encode(i interface{}) ([]byte, error) {    var buffer bytes.Buffer    encoder := gob.NewEncoder(&buffer)    if err := encoder.Encode(i); err != nil {        return nil, err     }       return buffer.Bytes(), nil }func (c GobCodec) Decode(data []byte, i interface{}) error {    buffer := bytes.NewBuffer(data)    decoder := gob.NewDecoder(buffer)    return decoder.Decode(i)}

codec/codec.go

RPC 消息格式编码设计如下,协议消息头定义定长 5 字节(byte),依次放置魔术数(用于校验),协议版本,消息类型(区分请求/响应),压缩类型,序列化协议类型,每个占 1 个字节(8 个 bit)。可扩展追加 消息 ID 以及 元数据 等信息用于做服务治理。const (    HEADER_LEN = 5)const (    magicNumber byte = 0x06)type MsgType byteconst (    Request MsgType = iota    Response)type CompressType byteconst (    None CompressType = iota    Gzip)type SerializeType byteconst (    Gob SerializeType = iota    JSON)type Header [HEADER_LEN]bytefunc (h *Header) CheckMagicNumber() bool {    return h[0] == magicNumber}func (h *Header) Version() byte {    return h[1]}func (h *Header) SetVersion(version byte) {    h[1] = version}//省略 MsgType,CompressType,SerializeType

protocol/header.go

定义协议消息格式,除了协议头,还包括调用的服务类名、方法名以及参数(Payload)。type RPCMsg struct {    *Header    ServiceClass  string    ServiceMethod string    Payload       []byte}func NewRPCMsg() *RPCMsg {    header := Header([HEADER_LEN]byte{})    header[0] = magicNumber    return &RPCMsg{        Header: &header,    }}

protocol/msg.go

实现传输

定义好协议后,要解决的问题就是如何通过网络(IO)发送和接收,实现通信的目的。

func (msg *RPCMsg) Send(writer io.Writer) error {    //send header    _, err := writer.Write(msg.Header[:])    if err != nil {        return err    }    //写入消息体总长度,方便一次性解析    dataLen := SPLIT_LEN + len(msg.ServiceClass) + SPLIT_LEN + len(msg.ServiceMethod) + SPLIT_LEN + len(msg.Payload)    err = binary.Write(writer, binary.BigEndian, uint32(dataLen)) //4    if err != nil {        return err    }    //write service.class len    err = binary.Write(writer, binary.BigEndian, uint32(len(msg.ServiceClass)))    if err != nil {        return err    }    //write service.class content    err = binary.Write(writer, binary.BigEndian, util.StringToByte(msg.ServiceClass))    if err != nil {        return err    }    //省略 service.method,payload }

protocol/msg.go

其中类名、方法名、payload 均为不定长部分,要想顺利解析就需要一一对应的长度字段标识不定长的长度,也就是 SPLIT_LEN 代表各部分长度,是 int32 类型(32 bit),正好相当于 4 个 byte,所以 SPLIT_LEN 为 4。

另外要注意网络传输一般使用大端字节序。先理解字节序即为字节(byte)的组成顺序,分为大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般采用小端序读写,而 TCP 网络传输采用大端序则更方便。对应这里的 binary.BigEndian 代码实现大端序。

消息读取后反解析,按发送顺序依次还原 Header、类名、方法名、Payload,不定长部分都有对应的长度保存,因此可以顺利解析到所有数据。

func Read(r io.Reader) (*RPCMsg, error) {    msg := NewRPCMsg()    err := msg.Decode(r)    if err != nil {        return nil, err    }    return msg, nil}func (msg *RPCMsg) Decode(r io.Reader) error {    //read header    _, err := io.ReadFull(r, msg.Header[:])    if !msg.Header.CheckMagicNumber() { //magicNumber        return fmt.Errorf("magic number error: %v", msg.Header[0])    }    //total body len    headerByte := make([]byte, 4)    _, err = io.ReadFull(r, headerByte)    if err != nil {        return err    }    bodyLen := binary.BigEndian.Uint32(headerByte)    //一次将整个body读取,再依次拆解    data := make([]byte, bodyLen)    _, err = io.ReadFull(r, data)    //service.class len    start := 0    end := start + SPLIT_LEN    classLen := binary.BigEndian.Uint32(data[start:end]) //0,4    //service.class    start = end    end = start + int(classLen)    msg.ServiceClass = util.ByteToString(data[start:end]) //4,x    //省略 method,payload}

protocol/msg.go

解决了最复杂的协议部分,下面依次来看服务端和客户端的实现。

服务端实现

服务端实现主要包括服务启停(端口监听)、服务注册、响应连接和处理请求几部分。

定义服务接口

服务接口提供服务启停和处理方法注册的能力。

type Server interface {    Register(string, interface{})    Run()    Close()}

provider/server.go

服务启停

实现服务启停,关键在于通过 ip 和端口开启监听,这里通过 Listener 封装 net 包开启 tcp Listen。

type RPCServer struct {    listener Listener}func NewRPCServer(ip string, port int) *RPCServer {    return &RPCServer{        listener: NewRPCListener(ip, port),    }   }func (svr *RPCServer) Run() {    go svr.listener.Run()}func (svr *RPCServer) Close() {    if svr.listener != nil {        svr.listener.Close()    }}

provider/server.go

type Listener interface {    Run()    SetHandler(string, Handler)    Close()}type RPCListener struct {    ServiceIp   string    ServicePort int    Handlers    map[string]Handler    nl          net.Listener}func NewRPCListener(serviceIp string, servicePort int) *RPCListener {    return &RPCListener{ServiceIp: serviceIp,        ServicePort: servicePort,        Handlers:    make(map[string]Handler)}}func (l *RPCListener) Run() {    addr := fmt.Sprintf("%s:%d", l.ServiceIp, l.ServicePort)    nl, err := net.Listen(config.NET_TRANS_PROTOCOL, addr)//tcp    if err != nil {        panic(err)    }       l.nl = nl    for {        conn, err := l.nl.Accept()        if err != nil {            continue        }          go l.handleConn(conn)    }}func (l *RPCListener) Close() {if l.nl != nil {     l.nl.Close()}}

provider/listener.go

这里通过为每个连接创建一个协程处理请求,得益于 Golang 的协程优势,Thread-Per-Message 模式来满足并发请求更容易实现(Java 线程成本太大,一般采用线程池实现 Worker Thread 模式)。

服务注册

服务注册就是在内存中维护一个映射关系,map (key=服务名,value=对象实例),通过 interface{} 泛化,可以反射还原。func (svr *RPCServer) Register(class interface{}) {    name := reflect.Indirect(reflect.ValueOf(class)).Type().Name()    svr.RegisterName(name, class)}func (svr *RPCServer) RegisterName(name string, class interface{}) {    handler := &RPCServerHandler{class: reflect.ValueOf(class)}    svr.listener.SetHandler(name, handler)    log.Printf("%s registered success!\n", name)}func (l *RPCListener) SetHandler(name string, handler Handler) {    if _, ok := l.Handlers[name]; ok {        log.Printf("%s is registered!\n", name)        return    }    l.Handlers[name] = handler}

provider/server.go

(1)由于服务启动初始化时进行所有服务注册,所以用 map 没考虑并发,否则如果有动态注册就需要考虑并发问题了。(2)这里没有注册到服务注册中心,设计考虑将以应用服务(系统)为单位进行注册,而具体的服务接口通过应用内存映射。这种注册粒度大,优点就是减少对注册中心的依赖和注册实例数量,提高服务发现资源利用率。(dubbo 3 重要改进就是将接口级服务发现切换为应用级服务发现)

响应连接请求

整个过程依次涉及从网络连接读取数据,反序列化获得请求结构体 (RPCMsg),根据注册类和方法找到目标函数并执行,将执行结果序列化后封装成 RPCMsg 通过网络发送,整个过程是同步 io 模型。

func (l *RPCListener) handleConn(conn net.Conn) {    defer catchPanic()    for {        msg, err := l.receiveData(conn)        if err != nil || msg == nil {            return        }        coder := global.Codecs[msg.Header.SerializeType()]        if coder == nil {            return        }        inArgs := make([]interface{}, 0)        err = coder.Decode(msg.Payload, &inArgs)        if err != nil {            return        }        handler, ok := l.Handlers[msg.ServiceClass]        if !ok {            return        }        result, err := handler.Handle(msg.ServiceMethod, inArgs)        encodeRes, err := coder.Encode(result)         if err != nil {            return        }        err = l.sendData(conn, encodeRes)        if err != nil {            return        }    }}

provider/listener.go

其中实际执行本地方法过程如下:func (handler *RPCServerHandler) Handle(method string, params []interface{}) ([]interface{}, error) {    args := make([]reflect.Value, len(params))    for i := range params {        args[i] = reflect.ValueOf(params[i])    }     reflectMethod := handler.class.MethodByName(method)        result := reflectMethod.Call(args)    resArgs := make([]interface{}, len(result))    for i := 0; i < len(result); i++ {        resArgs[i] = result[i].Interface()    }     var err error    if _, ok := result[len(result)-1].Interface().(error); ok {        err = result[len(result)-1].Interface().(error)    }    return resArgs, err}

provider/handler.go

收发网络请求通过调用之前封装的 Protocol 来完成。func (l *RPCListener) receiveData(conn net.Conn) (*protocol.RPCMsg, error) {    msg, err := protocol.Read(conn)    if err != nil {        if err != io.EOF { //close            return nil, err        }    }    return msg, nil}func (l *RPCListener) sendData(conn net.Conn, payload []byte) error {    resMsg := protocol.NewRPCMsg()    resMsg.SetVersion(config.Protocol_MsgVersion)    resMsg.SetMsgType(protocol.Response)    resMsg.SetCompressType(protocol.None)    resMsg.SetSerializeType(protocol.Gob)    resMsg.Payload = payload    return  resMsg.Send(conn)}

provider/listener.go

测试服务端

通过环境变量注入 ip 和 port,开启服务监听,依次注册几个服务。

func main() {    flag.Parse()    if ip == "" || port == 0 {        panic("init ip and port error")    }    srv := provider.NewRPCServer(ip, port)    srv.RegisterName("User", &UserHandler{})    srv.RegisterName("Test", &TestHandler{})    gob.Register(User{})    go srv.Run()    quit := make(chan os.Signal)    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)    <-quit    srv.Close()}

server.go

这里注册两个结构体 User 和 Test,特别注意:只有可导出的类方法(首字母大写)才能被客户端调用执行,否则会找不到对应类方法而失败。此外 User 作为接口值实现传输必须注册才行(gob.Register(User{}))。

type TestHandler struct{}func (t *TestHandler) Hello() string {    return "hello world"}type User struct {    ID   int    `json:"id"`    Name string `json:"name"`    Age  int    `json:"age"`}var userList = map[int]User{    1: User{1, "hero", 11},    2: User{2, "kavin", 12},}type UserHandler struct{}func (u *UserHandler) GetUserById(id int) (User, error) {    if u, ok := userList[id]; ok {        return u, nil    }    return User{}, fmt.Errorf("id %d not found", id)}

server.go

客户端实现

客户端发起 RPC 调用,就像调本地服务一样,所以需要定义一个 stub,该 stub 同请求服务端方法签名一致,然后通过代理实现网络请求和解析。

var Hello func() stringr, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)var GetUserById func(id int) (User, error)_, err := cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)u, err := GetUserById(2)

定义客户端

定义客户端接口,其中 Invoke 代理执行 RPC 请求。

type Client interface {    Connect(string) error    Invoke(context.Context, *Service, interface{}, ...interface{}) (interface{}, error)    Close()}

consumer/client.go

定义连接参数,设置重试次数、超时时间、序列化协议、压缩类型等。type Option struct {    Retries           int    ConnectionTimeout time.Duration    SerializeType     protocol.SerializeType    CompressType      protocol.CompressType}var DefaultOption = Option{    Retries:           3,    ConnectionTimeout: 5 * time.Second,    SerializeType:     protocol.Gob,    CompressType:      protocol.None,}type RPCClient struct {    conn   net.Conn    option Option}func NewClient(option Option) Client {    return &RPCClient{option: option}}

consumer/client.go

执行请求

实现网络连接、关闭以及执行部分。

func (cli *RPCClient) Connect(addr string) error {    conn, err := net.DialTimeout(config.NET_TRANS_PROTOCOL, addr, cli.option.ConnectionTimeout)    if err != nil {        return err    }    cli.conn = conn    return nil}func (cli *RPCClient) Invoke(ctx context.Context, service *Service, stub interface{}, params ...interface{}) (interface{}, error) {    cli.makeCall(service, stub)    return cli.wrapCall(ctx, stub, params...)}func (cli *RPCClient) Close() {    if cli.conn != nil {        cli.conn.Close()    }}

consumer/client.go

执行代理过程,主要依赖反射实现。这里 cli.makeCall() 主要是通过反射来生成代理函数,在代理函数中完成网络连接、请求数据序列化、网络传输、响应返回数据解析的工作,然后通过 cli.wrapCall() 发起实际调用。func (cli *RPCClient) makeCall(service *Service, methodPtr interface{}) {    container := reflect.ValueOf(methodPtr).Elem()     coder := global.Codecs[cli.option.SerializeType]    handler := func(req []reflect.Value) []reflect.Value {          numOut := container.Type().NumOut()        errorHandler := func(err error) []reflect.Value {            outArgs := make([]reflect.Value, numOut)            for i := 0; i < len(outArgs)-1; i++ {                outArgs[i] = reflect.Zero(container.Type().Out(i))            }            outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()            return outArgs        }        inArgs := make([]interface{}, 0, len(req))        for _, arg := range req {            inArgs = append(inArgs, arg.Interface())        }        payload, err := coder.Encode(inArgs) //[]byte        if err != nil {            log.Printf("encode err:%v\n", err)            return errorHandler(err)        }        msg := protocol.NewRPCMsg()        msg.SetVersion(config.Protocol_MsgVersion)        msg.SetMsgType(protocol.Request)        msg.SetCompressType(cli.option.CompressType)        msg.SetSerializeType(cli.option.SerializeType)        msg.ServiceClass = service.Class        msg.ServiceMethod = service.Method        msg.Payload = payload        err = msg.Send(cli.conn)        if err != nil {            log.Printf("send err:%v\n", err)            return errorHandler(err)        }        respMsg, err := protocol.Read(cli.conn)        if err != nil {            return errorHandler(err)        }        respDecode := make([]interface{}, 0)        err = coder.Decode(respMsg.Payload, &respDecode)        if err != nil {            log.Printf("decode err:%v\n", err)            return errorHandler(err)        }        if len(respDecode) == 0 {            respDecode = make([]interface{}, numOut)        }        outArgs := make([]reflect.Value, numOut)        for i := 0; i < numOut; i++ {            if i != numOut {                if respDecode[i] == nil {                    outArgs[i] = reflect.Zero(container.Type().Out(i))                } else {                    outArgs[i] = reflect.ValueOf(respDecode[i])                }            } else {                outArgs[i] = reflect.Zero(container.Type().Out(i))            }        }        return outArgs    }    container.Set(reflect.MakeFunc(container.Type(), handler))}

consumer/client.go

wrapCall 执行实际函数调用。func (cli *RPCClient) wrapCall(ctx context.Context, stub interface{}, params ...interface{}) (interface{}, error) {    f := reflect.ValueOf(stub).Elem()    if len(params) != f.Type().NumIn() {        return nil, errors.New(fmt.Sprintf("params not adapted: %d-%d", len(params), f.Type().NumIn()))    }    in := make([]reflect.Value, len(params))    for idx, param := range params {        in[idx] = reflect.ValueOf(param)    }    result := f.Call(in)    return result, nil}

consumer/client.go

代理实现

到目前为止,客户端主要实现逻辑有了,但是客户端在发起调用前是需要先连接到服务端,然后执行调用,还有长连接管理、超时、重试甚至鉴权等功能没有实现,因此需要有一个代理类完成以上动作。

type RPCClientProxy struct {    option Option}func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ...interface{}) (interface{}, error) {    service, err := NewService(servicePath)    if err != nil {        return nil, err    }           client := NewClient(cp.option)    addr := service.SelectAddr()    err = client.Connect(addr) //TODO 长连接管理    if err != nil {        return nil, err    }    retries := cp.option.Retries    for retries > 0 {        retries--        return client.Invoke(ctx, service, stub, params...)    }    return nil, errors.New("error")}

consumer/client_proxy.go

这里通过服务路径拆分依次获取类名、方法名、服务 AppId,然后根据AppId 查找服务注册中心获取到服务端(服务提供者)地址。由于篇幅限制,这部分将在下一篇实现(包括注册发现、负载均衡、长连接管理等),测试方便这里直接写死服务端地址。type Service struct {    AppId  string    Class  string    Method string    Addrs  []string}//demo: UserService.user.GetUserfunc NewService(servicePath string) (*Service, error) {    arr := strings.Split(servicePath, ".")    service := &Service{}    if len(arr) != 3 {         return service, errors.New("service path inlegal")    }       service.AppId = arr[0]    service.Class = arr[1]    service.Method = arr[2]    return service, nil }func (service *Service) SelectAddr() string {    return "ip:8811"}

consumer/service.go

测试客户端

客户端通过 stub 发起调用,执行过程看到发起了远程执行并从服务端获取到了结果。

func main() {    gob.Register(User{})    cli := consumer.NewClientProxy(consumer.DefaultOption)    ctx := context.Background()    var GetUserById func(id int) (User, error)    cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)    u, err := GetUserById(2)    log.Println("result:", u, err)    var Hello func() string    r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)    log.Println("result:", r, err)}

client.go

总结与补充

至此实现了简单的“P2P RPC”,后续可以迭代加入注册发现能力、长连接管理、异步调用、插件化扩展、负载均衡、认证授权、容错治理等能力,希望大家多多支持。

推荐阅读

Go:gRPC-Gateway 完全指南

福利我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。