Go RPC 远程过程调用 今天来学习 Go 语言的远程过程调用 RPC( Remote Procedure Call)。
在分布式计算,远程过程调用是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一个地址空间的子程序,而程序员就像调用本地程序一样,无需额外地为这个交互作用编程。RPC是一种服务器-客户端模式,经典实现是一个通过发送请求-接受回应进行信息交互的系统。
From WikiPedia
RPC 可以让客户端相对直接地访问服务端的函数,这里说的「相对直接」表示我们不需要在服务端自己写一些比如 web 服务的东西来提供接口,并且在两端手动做各种数据的编码、解码。
本文包括两部分,第一部分介绍 Golang 标准库的 net/rpc
,第二部分动手实现一个玩具版 PRC 框架来加深理解。
[TOC]
Part0. net/rpc
这一部分参考 《Go语言高级编程》4.1 RPC入门 。未尽之处可移步阅读原文。
Go 标准库的 net/rpc
实现了基本的 RPC,它使用一种 Go 语言特有的 Gob 编码方式,所以服务端、客户端都必须使用 Golang,不能跨语言调用。
对于服务端, net/rpc
要求用一个导出的结构体来表示 RPC 服务,这个结构体中所有符合特定要求的方法就是提供给客户端访问的:
1 2 3 type T struct {}func (t *T) MethodName (argType T1, replyType *T2) error
结构体是导出的。
方法是导出的。
方法有两个参数,都是导出的类型(或者内置类型)。
方法的第二个参数是指针。
方法的返回值是 error。
服务端通过 rpc.Dial
(对 TCP 服务)连接服务端,然后用使用 Call 调用 RPC 服务中的方法:
1 rpc.Call("T.MethodName" , argType T1, replyType *T2)
例如,用 net/rpc
实现一个 Hello World。
Hello World 服务端
首先构建一个 HelloService
来表示提供的服务:
1 2 3 4 5 6 7 8 9 10 type HelloService struct {}func (p *HelloService) Hello (request string , reply *string ) error { *reply = "Hello, " + request return nil }
接下来注册并开启 RPC 服务,我们可以基于 HTTP 服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 func main () { rpc.RegisterName("HelloService" , new (HelloService)) rpc.HandleHTTP() err := http.ListenAndServe(":1234" , nil ) if err != nil { log.Fatal("Http Listen and Serve:" , err) } }
也可以使用 TCP 服务,替换上面的第 8~12 行代码:
1 2 3 4 5 6 7 8 9 10 11 12 listener, err := net.Listen("tcp" , ":1234" ) if err != nil { log.Fatal("ListenTCP error:" , err) } conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:" , err) } rpc.ServeConn(conn)
注意,这里服务端只 Accept 一个请求,在客户端请求过后就会自动关闭。如果需要一直保持处理,可以把后半部分代码换成:
1 2 3 4 5 6 7 8 for { conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:" , err) } go rpc.ServeConn(conn) }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package mainimport ( "fmt" "log" "net/rpc" ) func main () { client, err := rpc.Dial("tcp" , "localhost:1234" ) if err != nil { log.Fatal("dialing:" , err) } var reply string err = client.Call("HelloService.Hello" , "world" , &reply) if err != nil { log.Fatal(err) } fmt.Println(reply) }
先启动服务端:
1 $ go run helloworld/server/server.go
在另一个终端调用客户端,即可得到结果:
1 2 $ go run helloworld/client/client.go Hello, world
更规范的 RPC 接口 之前的代码服务端、客户端的注册、调用 RPC 服务都是写死的。所有的工作都放到了一块,相当不利于维护,需要考虑重构 HelloService 服务和客户端实现。
服务端
首先,用一个 interface 抽象服务接口:
1 2 3 4 5 6 7 8 9 10 11 12 const HelloServiceName = "HelloService" type HelloServiceInterface interface { Hello(request string , reply *string ) error } func RegisterHelloService (svc HelloServiceInterface) error { return rpc.RegisterName(HelloServiceName, svc) }
在实例化服务时,注册用:
1 RegisterHelloService(new (HelloService))
其余的具体服务实现没有改变。
客户端
在客户端,考虑将 RPC 细节封装到一个客户端对象 HelloServiceClient
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type HelloServiceClient struct { *rpc.Client } var _ HelloServiceInterface = (*HelloServiceClient)(nil )func DialHelloService (network, address string ) (*HelloServiceClient, error) { c, err := rpc.Dial(network, address) if err != nil { return nil , err } return &HelloServiceClient{Client: c}, nil } func (p *HelloServiceClient) Hello (request string , reply *string ) error { return p.Client.Call(HelloServiceName + ".Hello" , request, reply) }
具体调用时,就不用去暴露处理 RPC 的细节了:
1 2 3 4 5 6 7 8 9 10 11 12 client, err := DialHelloService("tcp" , "localhost:1234" ) if err != nil { log.Fatal("dialing:" , err) } var reply string err = client.Hello("world" , &reply) if err != nil { log.Fatal(err) } fmt.Println(reply)
实例 运用上面的内容,做一个简单的计算器 RPC 服务。项目目录如下:
1 2 3 4 5 6 7 calc/ ├── calcrpc.go ├── client │ └── client.go └── server ├── calc.go └── server.go
首先写一个 calcrpc.go
定义服务端/客户端通用的 RPC 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package calcimport "net/rpc" const ServiceName = "CalcService" type ServiceInterface interface { CalcTwoNumber(request Calc, reply *float64 ) error GetOperators(request struct {}, reply *[]string ) error } func RegisterCalcService (svc ServiceInterface) error { return rpc.RegisterName(ServiceName, svc) } type Calc struct { Number1 float64 Number2 float64 Operator string }
然后写服务端实现,在 calc.go
中写一个常规的计算器抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package mainimport ( "errors" ) type Operation func (Number1, Number2 float64 ) float64 func Add (Number1, Number2 float64 ) float64 { return Number1 + Number2 } func Sub (Number1, Number2 float64 ) float64 { return Number1 - Number2 } func Mul (Number1, Number2 float64 ) float64 { return Number1 * Number2 } func Div (Number1, Number2 float64 ) float64 { return Number1 / Number2 } var Operators = map [string ]Operation { "+" : Add, "-" : Sub, "*" : Mul, "/" : Div, } func CreateOperation (operator string ) (Operation, error) { var oper Operation if oper, ok := Operators[operator]; ok { return oper, nil } return oper, errors.New("Illegal Operator" ) }
接下来是 RPC 服务的实现,在 server.go
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package mainimport ( "gorpctest/calc" "net/http" "net/rpc" ) type CalcService struct {}func (c *CalcService) CalcTwoNumber (request calc.Calc, reply *float64 ) error { oper, err := CreateOperation(request.Operator) if err != nil { return err } *reply = oper(request.Number1, request.Number2) return nil } func (c *CalcService) GetOperators (request struct {}, reply *[]string ) error { opers := make ([]string , 0 , len (Operators)) for key := range Operators { opers = append (opers, key) } *reply = opers return nil } func main () { calc.RegisterCalcService(new (CalcService)) rpc.HandleHTTP() http.ListenAndServe(":8080" , nil ) }
然后是客户端实现,在 client.go
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package mainimport ( "gorpctest/calc" "log" "net/rpc" ) type CalcClient struct { *rpc.Client } var _ calc.ServiceInterface = (*CalcClient)(nil )func DialCalcService (network, address string ) (*CalcClient, error) { c, err := rpc.DialHTTP(network, address) if err != nil { return nil , err } return &CalcClient{Client: c}, nil } func (c *CalcClient) CalcTwoNumber (request calc.Calc, reply *float64 ) error { return c.Client.Call(calc.ServiceName + ".CalcTwoNumber" , request, reply) } func (c *CalcClient) GetOperators (request struct {}, reply *[]string ) error { return c.Client.Call(calc.ServiceName + ".GetOperators" , request, reply) } func main () { client, err := DialCalcService("tcp" , "localhost:8080" ) if err != nil { log.Fatal("Err Dial Client:" , err) } var opers []string err = client.GetOperators(struct {}{}, &opers) if err != nil { log.Println(err) } log.Println(opers) testAdd := calc.Calc { Number1: 2.0 , Number2: 3.14 , Operator: "+" , } var result float64 client.CalcTwoNumber(testAdd, &result) log.Println(result) }
net/rpc/jsonrpc net/rpc
允许 RPC 数据打包时通过插件实现自定义的编码和解码:
1 2 3 4 5 6 rpc.ServeCodec(SomeServerCodec(conn)) conn, _ := net.Dial("tcp" , "localhost:1234" ) client := rpc.NewClientWithCodec(SomeClientCodec(conn))
net/rpc/jsonrpc
就是这样的一种实现,它使用 JSON 而不是 Gob 编码,可以用来做跨语言 RPC。在真实的使用中,net/rpc/jsonrpc
在内部封装了上面提到的编码、解码实现,提供大致上和 net/rpc
相同的 API。
服务端在之前的 Hello World 基础上,只需要改动 main 的最后一行代码(不算 }
)即可变为使用 JSON RPC:
1 2 go jsonrpc.ServeConn(conn)
jsonrpc.ServeConn
的实现是 rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
在调用时,将 DialHelloService
中连接服务的代码改一改就可以使用了:
1 2 c, err := jsonrpc.Dial(network, address)
这里也可以用:
1 2 conn, _ := net.Dial("tcp" , "localhost:1234" ) client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
这样开的服务是基于 TCP 的。我们可以关闭服务端程序,运行 nc -l 1234
启动一个 TCP 服务,然后再次运行客户端程序,nc 会输出客户端请求的内容:
1 2 $ nc -l 1234 {"method" :"HelloService.Hello" ,"params" :["world" ],"id" :0}
可以看到请求体是 JSON 数据。反过来,模仿这个请求体,我们可以手动向正在运行的客户端发送模拟请求,查看响应体:
1 2 $ echo -e '{"method":"HelloService.Hello","params":["JSON-RPC"],"id":1}' | nc localhost 1234 {"id" :1,"result" :"Hello, JSON-RPC" ,"error" :null}
总结一下,请求、响应的结构体大概为:
1 2 3 4 5 6 7 8 9 10 11 type Request struct { Method string `json:"method"` Params *json.RawMessage `json:"params"` Id *json.RawMessage `json:"id"` } type Response struct { Id uint64 `json:"id"` Result *json.RawMessage `json:"result"` Error interface {} `json:"error"` }
(其实真正的实现中,客户端和服务端请求、响应定义是略有区别的)
使用其他语言,只要遵循这样的请求/响应结构,就可以和 Go 的 RPC 服务进行通信了。
JSON-RPC in HTTP 刚才的实现是基于 TCP 的,有时候不方便使用,我们可能更希望使用熟悉的 HTTP 协议。
net/rpc
的 RPC 服务是建立在抽象的 io.ReadWriteCloser
接口之上的(conn),所以略作改变,就可以将 RPC 架设在不同的通讯协议之上。这里我们将尝试将 net/rpc/jsonrpc
架设到 HTTP 服务上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main () { RegisterHelloService(new (HelloService)) http.HandleFunc("/jsonrpc" , func (w http.ResponseWriter, r *http.Request) { var conn io.ReadWriteCloser = struct { io.Writer io.ReadCloser } { ReadCloser: r.Body, Writer: w, } rpc.ServeRequest(jsonrpc.NewServerCodec(conn)) }) http.ListenAndServe(":1234" , nil ) }
然后就可以通过 HTTP 很方便地从不同的语言中访问 RPC 服务了:
1 2 curl -X POST http://localhost:1234/jsonrpc --data '{"method":"HelloService.Hello","params":["world"],"id":0}' {"id" :0,"result" :"Hello, world" ,"error" :null}
但是,这里有个问题是,不方便使用 Go 写客户端,你需要自己去构建一个客户端实现,来完成请求的编码、发送以及响应的解码、绑定😂。或者,也可以使用一个 JSON-RPC 的库。
Part1. 简单 RPC 的实现
这一部分参考 《从0开始学习微服务框架》 P9~P14 RPC 。未尽之处可移步学习原视频。
为了加深理解,我们手写一个简单的 RPC 服务,从自定义协议到编码、解码,再到 RPC 服务端、客户端实现。
我们写一个 package rpc
来实现这东西:
1 2 3 4 5 6 /rpc ├── client.go ├── codec.go ├── server.go └── session.go (省略了测试文件)
网络通信 我们基于 TCP 通信,使用如下自定义的协议进行通信:
网络字节流
Header
Data
大小
uint32
(定长:4字节)
[]byte
(长度由Header指明)
说明
Data 的长度信息
具体数据
我们通过一个 Session
结构体实现这个基本的协议:
1 2 3 4 5 6 7 8 9 10 11 type Session struct { conn net.Conn } func NewSession (conn net.Conn) *Session { return &Session{conn: conn} }
之后的 RPC 通信就通过 Session
来对 TCP 连接进行数据读写操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (s *Session) Write (data []byte ) error { buf := make ([]byte , 4 +len (data)) binary.BigEndian.PutUint32(buf[:4 ], uint32 (len (data))) copy (buf[4 :], data) _, err := s.conn.Write(buf) return err } func (s *Session) Read () ([]byte , error) { header := make ([]byte , 4 ) if _, err := io.ReadFull(s.conn, header); err != nil { return nil , err } dataLen := binary.BigEndian.Uint32(header) data := make ([]byte , dataLen) if _, err := io.ReadFull(s.conn, data); err != nil { return nil , err } return data, nil }
编码解码 在 RPC 的过程中,我们需要按照一定的格式传递函数的参数与结果。我们可以定义如下 RPCData
来格式化 RPC 通信的内容:
1 2 3 4 5 6 7 type RPCData struct { Func string Args []interface {} }
在整个 RPC 中,所有网络通信都利用 Session
对 RPCData
编码成的 []byte
进行传输。要把 RPCData
在一端编码成字节,并在另一端解码会原本的 Go 数据类型,可以利用 encoding/gob
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func encode (data RPCData) ([]byte , error) { var buf bytes.Buffer encoder := gob.NewEncoder(&buf) if err := encoder.Encode(data); err != nil { return nil , err } return buf.Bytes(), nil } func decode (data []byte ) (RPCData, error) { buf := bytes.NewBuffer(data) decoder := gob.NewDecoder(buf) var rpcData RPCData err := decoder.Decode(&rpcData) return rpcData, err }
有了网络通信的方案以及编码解码的方式,就可以开始构建 RPC 服务的服务端框架以及客户端实现了。
服务端 RPC 服务端的核心是,维护一个函数名到本地函数的映射。实现这个映射,并开启一个网络服务,就可以支持客户端通过给定函数名和参数即可调用服务端函数的操作了。
这里可以简单地把服务定义如下:
1 2 3 4 5 6 7 8 9 10 type Server struct { funcs map [string ]reflect.Value } func NewServer () *Server { return &Server{funcs: map [string ]reflect.Value{}} }
通过反射机制,来实现 funs 的映射:
1 2 3 4 5 6 7 8 9 10 11 12 func (s *Server) Register (name string , function interface {}) { if _, ok := s.funcs[name]; ok { return } fVal := reflect.ValueOf(function) s.funcs[name] = fVal }
接下来是开启网络服务,监听 TCP 连接,对访问进行服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (s *Server) ListenAndServe (address string ) error { listener, err := net.Listen("tcp" , address) if err != nil { return err } for { conn, err := listener.Accept() if err != nil { log.Println("accept error:" , err) continue } s.handleConn(conn) } }
具体对连接的处理在 handleConn
中完成。对 conn 创建一个 RPC 会话,解码请求体,得到客户端希望请求的函数和参数。调用本地函数完成工作,将返回值编码,返回给客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func (s *Server) handleConn (conn net.Conn) { srvSession := NewSession(conn) data, err := srvSession.Read() if err != nil { log.Println("session read error:" , err) return } requestRPCData, err := decode(data) if err != nil { log.Println("data decode error:" , err) return } f, ok := s.funcs[requestRPCData.Func] if !ok { log.Printf("unexpected rpc call: function %s not exist" , requestRPCData.Func) return } inArgs := make ([]reflect.Value, 0 , len (requestRPCData.Args)) for _, arg := range requestRPCData.Args { inArgs = append (inArgs, reflect.ValueOf(arg)) } returnValues := f.Call(inArgs) outArgs := make ([]interface {}, 0 , len (returnValues)) for _, ret := range returnValues { outArgs = append (outArgs, ret.Interface()) } replyRPCData := RPCData{ Func: requestRPCData.Func, Args: outArgs, } replyEncoded, err := encode(replyRPCData) if err != nil { log.Println("reply encode error:" , err) return } err = srvSession.Write(replyEncoded) if err != nil { log.Println("reply write error:" , err) } }
客户端 RPC 客户端的一个特点是,像调用本地函数一样去调用远程的函数。要调用的函数并不是在本地实现的,但我们希望让它像本地函数一样工作。反射机制可以提供这种“欺骗自己”的特性。
首先我们写出客户端结构,其实就是对一个网络连接的包装:
1 2 3 4 5 6 7 8 9 10 type Client struct { conn net.Conn } func NewClient (conn net.Conn) *Client { return &Client{conn: conn} }
然后实现一个 Call
方法,把原创的函数通过 RPC 带到本地来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func (c *Client) Call (name string , funcPtr interface {}) { fn := reflect.ValueOf(funcPtr).Elem() f := func (args []reflect.Value) []reflect .Value { inArgs := make ([]interface {}, 0 , len (args)) for _, arg := range args { inArgs = append (inArgs, arg.Interface()) } cliSession := NewSession(c.conn) requestRPCData := RPCData{ Func: name, Args: inArgs, } requestEncoded, err := encode(requestRPCData) if err != nil { panic (err) } if err := cliSession.Write(requestEncoded); err != nil { panic (err) } response, err := cliSession.Read() if err != nil { panic (err) } respRPCData, err := decode(response) if err != nil { panic (err) } outArgs := make ([]reflect.Value, 0 , len (respRPCData.Args)) for i, arg := range respRPCData.Args { if arg == nil { outArgs = append (outArgs, reflect.Zero(fn.Type().Out(i))) } else { outArgs = append (outArgs, reflect.ValueOf(arg)) } } return outArgs } v := reflect.MakeFunc(fn.Type(), f) fn.Set(v) }
这个函数接受两个参数,name
为 RPC 服务端提供的函数名,funcPtr
是要调用的函数的原型。该函数运行的结果是将一个「封装了 RPC 调用远程函数的函数」“赋给” funcPtr
,让 funcPtr
从一个空有其表的原型变成一个可调用的真实函数,调用它就等于通过 RPC 调用服务端相应的函数。
例如,我们在服务端实现并注册了函数:
1 2 3 func queryUser (uid int ) (User, error) { ... }
在客户端,我们就可以通过一个 queryUser 函数的原型来获得其能力:
1 2 3 var query func (int ) (User, error) // query 是 queryUser 的原型client.Call("queryUser" , &query) u, err := query(1 )
如果对反射不太熟悉,难以理解代码实现的话,这里可能有点迷。再来看一个具体调用的例子吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package rpcimport ( "encoding/gob" "fmt" "net" "testing" ) type User struct { Name string Age int } func queryUser (uid int ) (User, error) { user := make (map [int ]User) user[0 ] = User{Name: "Foo" , Age: 12 } user[1 ] = User{Name: "Bar" , Age: 13 } user[2 ] = User{Name: "Joe" , Age: 14 } if u, ok := user[uid]; ok { return u, nil } return User{}, fmt.Errorf("user wiht id %d is not exist" , uid) } func TestRPC (t *testing.T) { gob.Register(User{}) addr := ":8080" srv := NewServer() srv.Register("queryUser" , queryUser) go srv.ListenAndServe(addr) conn, err := net.Dial("tcp" , addr) if err != nil { t.Error(err) } cli := NewClient(conn) var query func (int ) (User, error) cli.Call("queryUser" , &query) u, err := query(1 ) if err != nil { t.Error(err) } fmt.Println(u) }
TestRPC
中模拟了服务端以及客户端调用 RPC 服务。
至此,一个完整的玩具版 RPC 就完成了,自己来写这东西还是挺有意思。完整的代码我放到了这个 Gist 里 cdfmlr/toy-rpc-golang :
1 2 By("CDFMLR" , "2020-09-12" )