目录
内容提要
RPC 框架是分布式领域核心组件,也是微服务的基础。今天尝试从零撸一个 RPC 框架,剖析其核心原理及代码实现,后续还会逐步迭代追加微服务治理等功能,将之前文章覆盖的熔断、限流、负载均衡、注册发现等功能融合进来,打造一个五脏俱全的 RPC 框架。本文主要内容包括:
RPC 实现原理RPC 协议设计
RPC 服务端实现
RPC 客户端实现
实现原理
RPC (Remote Procedure Call)全称是远程过程调用,相对于本地方法调用,在同一内存空间可以直接通过方法栈实现调用,远程调用则跨了不同的服务终端,并不能直接调用。
RPC框架 要解决的就是远程方法调用的问题,并且实现调用远程服务像调用本地服务一样简单,框架内部封装实现了网络调用的细节(透明化远程调用),其核心过程原理如下图所示。
这个版本可以称为 “P2P RPC” ,而生产环境部署往往会将服务提供者(Server)部署多个实例(集群部署),那么客户端就需要具备发现服务端的能力和负载均衡的支持,所以有了服务注册发现和负载均衡。
再然后,为了保障 RPC 调用的可靠性和稳定性,增加了服务监控和服务容错治理的能力,考虑性能提升的异步化能力以及考虑可扩展性的插件化管理,这些完善构成了更完整的微服务 RPC 框架。
RPC 协议实现
协议设计
协议设计算是 RPC 最重要的一部分了,它主要解决服务端与客户端通信的问题。一般来说通讯要解决如下问题:
1. 网络传输协议基于 TCP、UDP 还是 HTTP,UDP 要自己解决可靠性传输问题,而 HTTP 又太重,包含很多没必要的头信息,所以一般 RPC 框架会优先选择 TCP 协议。
(当然也有大名鼎鼎的 gRPC 基于 HTTP2)
2. 序列化协议
网络传输数据必须是二进制数据,而执行过程是编程语言的对象方法,那么就涉及到如何将对象序列化成可传输消息(二进制),并可反序列化还原。常见的通用型协议如 XML、 JSON、Protobuf、Thrift 等,也有语言绑定的如 Python 原生支持的 pickle 协议, Java 实现的 Serializbale 接口及 Hessian 协议,Golang 原生支持的 Gob 协议等。
3. 消息编码协议
它是一种客户端和服务端的调用约定,比如请求和参数如何组织,Header 放置什么内容。这部分每个框架设计均不同,有时也称这一层为狭义的 RPC 协议层。
另外客户端发起调用一般来说要知道调用的具体类方法(请求标识符)以及入参(Payload),而网络传输的是二进制字节流,如何能从这些字节中找出哪些是方法名,哪些是参数?进一步如果客户端不断的发送消息,如何将每一条消息分割?(解决 TCP 粘包问题)
采用定长消息很容易解决,但事先并不能确定要固定多长,所以这种方式并不可行。消息加分隔符可以实现,但要确保分隔符不会与正文冲突。而最常用的实现方案就是用定长的头标识出不定长的体,比如用 int32 (定长 4 字节)标识后面的内容长度,这样就能较优雅实现消息分割了。
(注:这个方案中如果消息体的长度大于 2^32 会发生溢出而导致解析失败,可以换更长类型,但理论上总会有溢出风险,设计使用时应该限制避免传输过大数据体)
协议实现
网络传输协议,这里使用 TCP 协议即可,没有太多争议,可预留接口支持。
序列化协议,这里使用 Golang 专有的 Gob 协议,保留接口后期可以扩展支持 JSON、Protobuf 等协议。
type Codec interface { Encode(i interface{}) ([]byte, error) Decode(data []byte, i interface{}) error}type GobCodec struct{}func (c GobCodec) Encode(i interface{}) ([]byte, error) { var buffer bytes.Buffer encoder := gob.NewEncoder(&buffer) if err := encoder.Encode(i); err != nil { return nil, err } return buffer.Bytes(), nil }func (c GobCodec) Decode(data []byte, i interface{}) error { buffer := bytes.NewBuffer(data) decoder := gob.NewDecoder(buffer) return decoder.Decode(i)}codec/codec.go
RPC 消息格式编码设计如下,协议消息头定义定长 5 字节(byte),依次放置魔术数(用于校验),协议版本,消息类型(区分请求/响应),压缩类型,序列化协议类型,每个占 1 个字节(8 个 bit)。可扩展追加 消息 ID 以及 元数据 等信息用于做服务治理。const ( HEADER_LEN = 5)const ( magicNumber byte = 0x06)type MsgType byteconst ( Request MsgType = iota Response)type CompressType byteconst ( None CompressType = iota Gzip)type SerializeType byteconst ( Gob SerializeType = iota JSON)type Header [HEADER_LEN]bytefunc (h *Header) CheckMagicNumber() bool { return h[0] == magicNumber}func (h *Header) Version() byte { return h[1]}func (h *Header) SetVersion(version byte) { h[1] = version}//省略 MsgType,CompressType,SerializeTypeprotocol/header.go
定义协议消息格式,除了协议头,还包括调用的服务类名、方法名以及参数(Payload)。type RPCMsg struct { *Header ServiceClass string ServiceMethod string Payload []byte}func NewRPCMsg() *RPCMsg { header := Header([HEADER_LEN]byte{}) header[0] = magicNumber return &RPCMsg{ Header: &header, }}protocol/msg.go
实现传输
定义好协议后,要解决的问题就是如何通过网络(IO)发送和接收,实现通信的目的。
func (msg *RPCMsg) Send(writer io.Writer) error { //send header _, err := writer.Write(msg.Header[:]) if err != nil { return err } //写入消息体总长度,方便一次性解析 dataLen := SPLIT_LEN + len(msg.ServiceClass) + SPLIT_LEN + len(msg.ServiceMethod) + SPLIT_LEN + len(msg.Payload) err = binary.Write(writer, binary.BigEndian, uint32(dataLen)) //4 if err != nil { return err } //write service.class len err = binary.Write(writer, binary.BigEndian, uint32(len(msg.ServiceClass))) if err != nil { return err } //write service.class content err = binary.Write(writer, binary.BigEndian, util.StringToByte(msg.ServiceClass)) if err != nil { return err } //省略 service.method,payload }protocol/msg.go
其中类名、方法名、payload 均为不定长部分,要想顺利解析就需要一一对应的长度字段标识不定长的长度,也就是 SPLIT_LEN 代表各部分长度,是 int32 类型(32 bit),正好相当于 4 个 byte,所以 SPLIT_LEN 为 4。另外要注意网络传输一般使用大端字节序。先理解字节序即为字节(byte)的组成顺序,分为大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般采用小端序读写,而 TCP 网络传输采用大端序则更方便。对应这里的 binary.BigEndian 代码实现大端序。
消息读取后反解析,按发送顺序依次还原 Header、类名、方法名、Payload,不定长部分都有对应的长度保存,因此可以顺利解析到所有数据。func Read(r io.Reader) (*RPCMsg, error) { msg := NewRPCMsg() err := msg.Decode(r) if err != nil { return nil, err } return msg, nil}func (msg *RPCMsg) Decode(r io.Reader) error { //read header _, err := io.ReadFull(r, msg.Header[:]) if !msg.Header.CheckMagicNumber() { //magicNumber return fmt.Errorf("magic number error: %v", msg.Header[0]) } //total body len headerByte := make([]byte, 4) _, err = io.ReadFull(r, headerByte) if err != nil { return err } bodyLen := binary.BigEndian.Uint32(headerByte) //一次将整个body读取,再依次拆解 data := make([]byte, bodyLen) _, err = io.ReadFull(r, data) //service.class len start := 0 end := start + SPLIT_LEN classLen := binary.BigEndian.Uint32(data[start:end]) //0,4 //service.class start = end end = start + int(classLen) msg.ServiceClass = util.ByteToString(data[start:end]) //4,x //省略 method,payload}
protocol/msg.go解决了最复杂的协议部分,下面依次来看服务端和客户端的实现。
服务端实现
服务端实现主要包括服务启停(端口监听)、服务注册、响应连接和处理请求几部分。
定义服务接口
服务接口提供服务启停和处理方法注册的能力。
type Server interface { Register(string, interface{}) Run() Close()}provider/server.go
服务启停
实现服务启停,关键在于通过 ip 和端口开启监听,这里通过 Listener 封装 net 包开启 tcp Listen。
type RPCServer struct { listener Listener}func NewRPCServer(ip string, port int) *RPCServer { return &RPCServer{ listener: NewRPCListener(ip, port), } }func (svr *RPCServer) Run() { go svr.listener.Run()}func (svr *RPCServer) Close() { if svr.listener != nil { svr.listener.Close() }}provider/server.go
type Listener interface { Run() SetHandler(string, Handler) Close()}type RPCListener struct { ServiceIp string ServicePort int Handlers map[string]Handler nl net.Listener}func NewRPCListener(serviceIp string, servicePort int) *RPCListener { return &RPCListener{ServiceIp: serviceIp, ServicePort: servicePort, Handlers: make(map[string]Handler)}}func (l *RPCListener) Run() { addr := fmt.Sprintf("%s:%d", l.ServiceIp, l.ServicePort) nl, err := net.Listen(config.NET_TRANS_PROTOCOL, addr)//tcp if err != nil { panic(err) } l.nl = nl for { conn, err := l.nl.Accept() if err != nil { continue } go l.handleConn(conn) }}func (l *RPCListener) Close() {if l.nl != nil { l.nl.Close()}}provider/listener.go
这里通过为每个连接创建一个协程处理请求,得益于 Golang 的协程优势,Thread-Per-Message 模式来满足并发请求更容易实现(Java 线程成本太大,一般采用线程池实现 Worker Thread 模式)。
服务注册
服务注册就是在内存中维护一个映射关系,map (key=服务名,value=对象实例),通过 interface{} 泛化,可以反射还原。func (svr *RPCServer) Register(class interface{}) { name := reflect.Indirect(reflect.ValueOf(class)).Type().Name() svr.RegisterName(name, class)}func (svr *RPCServer) RegisterName(name string, class interface{}) { handler := &RPCServerHandler{class: reflect.ValueOf(class)} svr.listener.SetHandler(name, handler) log.Printf("%s registered success!\n", name)}func (l *RPCListener) SetHandler(name string, handler Handler) { if _, ok := l.Handlers[name]; ok { log.Printf("%s is registered!\n", name) return } l.Handlers[name] = handler}provider/server.go
(1)由于服务启动初始化时进行所有服务注册,所以用 map 没考虑并发,否则如果有动态注册就需要考虑并发问题了。(2)这里没有注册到服务注册中心,设计考虑将以应用服务(系统)为单位进行注册,而具体的服务接口通过应用内存映射。这种注册粒度大,优点就是减少对注册中心的依赖和注册实例数量,提高服务发现资源利用率。(dubbo 3 重要改进就是将接口级服务发现切换为应用级服务发现)响应连接请求
整个过程依次涉及从网络连接读取数据,反序列化获得请求结构体 (RPCMsg),根据注册类和方法找到目标函数并执行,将执行结果序列化后封装成 RPCMsg 通过网络发送,整个过程是同步 io 模型。
func (l *RPCListener) handleConn(conn net.Conn) { defer catchPanic() for { msg, err := l.receiveData(conn) if err != nil || msg == nil { return } coder := global.Codecs[msg.Header.SerializeType()] if coder == nil { return } inArgs := make([]interface{}, 0) err = coder.Decode(msg.Payload, &inArgs) if err != nil { return } handler, ok := l.Handlers[msg.ServiceClass] if !ok { return } result, err := handler.Handle(msg.ServiceMethod, inArgs) encodeRes, err := coder.Encode(result) if err != nil { return } err = l.sendData(conn, encodeRes) if err != nil { return } }}provider/listener.go
其中实际执行本地方法过程如下:func (handler *RPCServerHandler) Handle(method string, params []interface{}) ([]interface{}, error) { args := make([]reflect.Value, len(params)) for i := range params { args[i] = reflect.ValueOf(params[i]) } reflectMethod := handler.class.MethodByName(method) result := reflectMethod.Call(args) resArgs := make([]interface{}, len(result)) for i := 0; i < len(result); i++ { resArgs[i] = result[i].Interface() } var err error if _, ok := result[len(result)-1].Interface().(error); ok { err = result[len(result)-1].Interface().(error) } return resArgs, err}provider/handler.go
收发网络请求通过调用之前封装的 Protocol 来完成。func (l *RPCListener) receiveData(conn net.Conn) (*protocol.RPCMsg, error) { msg, err := protocol.Read(conn) if err != nil { if err != io.EOF { //close return nil, err } } return msg, nil}func (l *RPCListener) sendData(conn net.Conn, payload []byte) error { resMsg := protocol.NewRPCMsg() resMsg.SetVersion(config.Protocol_MsgVersion) resMsg.SetMsgType(protocol.Response) resMsg.SetCompressType(protocol.None) resMsg.SetSerializeType(protocol.Gob) resMsg.Payload = payload return resMsg.Send(conn)}provider/listener.go
测试服务端
通过环境变量注入 ip 和 port,开启服务监听,依次注册几个服务。
func main() { flag.Parse() if ip == "" || port == 0 { panic("init ip and port error") } srv := provider.NewRPCServer(ip, port) srv.RegisterName("User", &UserHandler{}) srv.RegisterName("Test", &TestHandler{}) gob.Register(User{}) go srv.Run() quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT) <-quit srv.Close()}server.go
这里注册两个结构体 User 和 Test,特别注意:只有可导出的类方法(首字母大写)才能被客户端调用执行,否则会找不到对应类方法而失败。此外 User 作为接口值实现传输必须注册才行(gob.Register(User{}))。
type TestHandler struct{}func (t *TestHandler) Hello() string { return "hello world"}type User struct { ID int `json:"id"` Name string `json:"name"` Age int `json:"age"`}var userList = map[int]User{ 1: User{1, "hero", 11}, 2: User{2, "kavin", 12},}type UserHandler struct{}func (u *UserHandler) GetUserById(id int) (User, error) { if u, ok := userList[id]; ok { return u, nil } return User{}, fmt.Errorf("id %d not found", id)}server.go
客户端实现
客户端发起 RPC 调用,就像调本地服务一样,所以需要定义一个 stub,该 stub 同请求服务端方法签名一致,然后通过代理实现网络请求和解析。
var Hello func() stringr, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)var GetUserById func(id int) (User, error)_, err := cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)u, err := GetUserById(2)定义客户端
定义客户端接口,其中 Invoke 代理执行 RPC 请求。
type Client interface { Connect(string) error Invoke(context.Context, *Service, interface{}, ...interface{}) (interface{}, error) Close()}consumer/client.go
定义连接参数,设置重试次数、超时时间、序列化协议、压缩类型等。type Option struct { Retries int ConnectionTimeout time.Duration SerializeType protocol.SerializeType CompressType protocol.CompressType}var DefaultOption = Option{ Retries: 3, ConnectionTimeout: 5 * time.Second, SerializeType: protocol.Gob, CompressType: protocol.None,}type RPCClient struct { conn net.Conn option Option}func NewClient(option Option) Client { return &RPCClient{option: option}}consumer/client.go
执行请求
实现网络连接、关闭以及执行部分。
func (cli *RPCClient) Connect(addr string) error { conn, err := net.DialTimeout(config.NET_TRANS_PROTOCOL, addr, cli.option.ConnectionTimeout) if err != nil { return err } cli.conn = conn return nil}func (cli *RPCClient) Invoke(ctx context.Context, service *Service, stub interface{}, params ...interface{}) (interface{}, error) { cli.makeCall(service, stub) return cli.wrapCall(ctx, stub, params...)}func (cli *RPCClient) Close() { if cli.conn != nil { cli.conn.Close() }}consumer/client.go
执行代理过程,主要依赖反射实现。这里 cli.makeCall() 主要是通过反射来生成代理函数,在代理函数中完成网络连接、请求数据序列化、网络传输、响应返回数据解析的工作,然后通过 cli.wrapCall() 发起实际调用。func (cli *RPCClient) makeCall(service *Service, methodPtr interface{}) { container := reflect.ValueOf(methodPtr).Elem() coder := global.Codecs[cli.option.SerializeType] handler := func(req []reflect.Value) []reflect.Value { numOut := container.Type().NumOut() errorHandler := func(err error) []reflect.Value { outArgs := make([]reflect.Value, numOut) for i := 0; i < len(outArgs)-1; i++ { outArgs[i] = reflect.Zero(container.Type().Out(i)) } outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem() return outArgs } inArgs := make([]interface{}, 0, len(req)) for _, arg := range req { inArgs = append(inArgs, arg.Interface()) } payload, err := coder.Encode(inArgs) //[]byte if err != nil { log.Printf("encode err:%v\n", err) return errorHandler(err) } msg := protocol.NewRPCMsg() msg.SetVersion(config.Protocol_MsgVersion) msg.SetMsgType(protocol.Request) msg.SetCompressType(cli.option.CompressType) msg.SetSerializeType(cli.option.SerializeType) msg.ServiceClass = service.Class msg.ServiceMethod = service.Method msg.Payload = payload err = msg.Send(cli.conn) if err != nil { log.Printf("send err:%v\n", err) return errorHandler(err) } respMsg, err := protocol.Read(cli.conn) if err != nil { return errorHandler(err) } respDecode := make([]interface{}, 0) err = coder.Decode(respMsg.Payload, &respDecode) if err != nil { log.Printf("decode err:%v\n", err) return errorHandler(err) } if len(respDecode) == 0 { respDecode = make([]interface{}, numOut) } outArgs := make([]reflect.Value, numOut) for i := 0; i < numOut; i++ { if i != numOut { if respDecode[i] == nil { outArgs[i] = reflect.Zero(container.Type().Out(i)) } else { outArgs[i] = reflect.ValueOf(respDecode[i]) } } else { outArgs[i] = reflect.Zero(container.Type().Out(i)) } } return outArgs } container.Set(reflect.MakeFunc(container.Type(), handler))}consumer/client.go
wrapCall 执行实际函数调用。func (cli *RPCClient) wrapCall(ctx context.Context, stub interface{}, params ...interface{}) (interface{}, error) { f := reflect.ValueOf(stub).Elem() if len(params) != f.Type().NumIn() { return nil, errors.New(fmt.Sprintf("params not adapted: %d-%d", len(params), f.Type().NumIn())) } in := make([]reflect.Value, len(params)) for idx, param := range params { in[idx] = reflect.ValueOf(param) } result := f.Call(in) return result, nil}consumer/client.go
代理实现
到目前为止,客户端主要实现逻辑有了,但是客户端在发起调用前是需要先连接到服务端,然后执行调用,还有长连接管理、超时、重试甚至鉴权等功能没有实现,因此需要有一个代理类完成以上动作。
type RPCClientProxy struct { option Option}func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ...interface{}) (interface{}, error) { service, err := NewService(servicePath) if err != nil { return nil, err } client := NewClient(cp.option) addr := service.SelectAddr() err = client.Connect(addr) //TODO 长连接管理 if err != nil { return nil, err } retries := cp.option.Retries for retries > 0 { retries-- return client.Invoke(ctx, service, stub, params...) } return nil, errors.New("error")}consumer/client_proxy.go
这里通过服务路径拆分依次获取类名、方法名、服务 AppId,然后根据AppId 查找服务注册中心获取到服务端(服务提供者)地址。由于篇幅限制,这部分将在下一篇实现(包括注册发现、负载均衡、长连接管理等),测试方便这里直接写死服务端地址。type Service struct { AppId string Class string Method string Addrs []string}//demo: UserService.user.GetUserfunc NewService(servicePath string) (*Service, error) { arr := strings.Split(servicePath, ".") service := &Service{} if len(arr) != 3 { return service, errors.New("service path inlegal") } service.AppId = arr[0] service.Class = arr[1] service.Method = arr[2] return service, nil }func (service *Service) SelectAddr() string { return "ip:8811"}consumer/service.go
测试客户端
客户端通过 stub 发起调用,执行过程看到发起了远程执行并从服务端获取到了结果。
func main() { gob.Register(User{}) cli := consumer.NewClientProxy(consumer.DefaultOption) ctx := context.Background() var GetUserById func(id int) (User, error) cli.Call(ctx, "UserService.User.GetUserById", &GetUserById) u, err := GetUserById(2) log.Println("result:", u, err) var Hello func() string r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello) log.Println("result:", r, err)}client.go
总结与补充
至此实现了简单的“P2P RPC”,后续可以迭代加入注册发现能力、长连接管理、异步调用、插件化扩展、负载均衡、认证授权、容错治理等能力,希望大家多多支持。
推荐阅读
Go:gRPC-Gateway 完全指南
福利我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。