RPC

Remote Procedure Call

本地函数放到服务器运行,会出现若干问题:

  1. 我怎么知道是哪个函数?Call Id
    本地函数调用,可以直接用指针找到函数;但是远程过程调用不行。
    因此我们需要分别在Client和Server维护一个“函数 <-> Call Id”的映射来确定所调用的函数。
  2. Client如何将参数传送到Server?序列化与反序列化
    本地函数调用,参数会压入栈;然而在远程过程调用中,Client与Server是不同的进程、处理器、操作系统、大小端,而且链表、对象这样的数据内存不分配在一处,加上网络传输必须要有容错机制,不能通过内存传递参数。
    因此我们需要使用网络传输,Client要将参数转换为字节流,传输到Server后,再反序列化还原为参数。
    这里还会涉及到数据格式的问题,JSON(性能不高)、XML、Protobuf、Thrift都是数据格式。
  3. 不使用内存,如何传输?网络传输
    网络传输层需要将Call Id与字节流传输给Server,因此RPC基于传输层TCP协议,gRPC基于HTTP2协议(同样基于TCP)。

早期的RPC不使用HTTP,是因为当时HTTP不能建立长连接,并且HTTP头部过长且不能压缩。HTTP2解决了上述问题。

一个HTTP请求

http://localhost:8080/add?a=1&b=2
"Content-Type": "application/json"

这个请求指定了方法add、协议http、数据格式JSON

gRPC

Protobuf

Protocol Buffer,性能优于XML、JSON。

  • 压缩性能、序列化、传输速度快
  • 向后兼容(不破坏旧接口)、加密性好(二进制)
  • Protobuf需要专门的解析器;只有通过proto文件才能了解数据结构
message HelloReq {
message InnerReq {
string name = 1;
string url = 2;
}
Gender gender = 1;
map<string, string> map = 2;
google.protobuf.Timestamp createTime = 3;
repeated InnerReq req = 4;
}
// -I 路径; --go_out 生成go代码; plugins=grpc:. 使用grpc拓展,使用grpc拓展生成接口代码,放在当前目录下
protoc -I . <filename>.proto --go_out=plugins=grpc:.

proto2go

service Greeter {
rpc SayHello (HelloReq) returns (HelloResp);
}

Server

type GreeterServer interface {
SayHello(context.Context, *HelloReq) (*HelloReply, error)
}

func RegisterGreeterServer(s *gprc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}

Client

type greeterClient struct {
cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
// 返回一个实现了Interface所有方法的结构体
return &greeterClient{cc}
}

type GreeterClient interface {
SayHello(ctx content.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloReply, error)
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/Greeter/SayHello", in, out, opts...)
if err != nil {
return out, nil
}
}

Stream

Simple RPC

Client和Server都建立短连接。

Server-side streaming RPC

Client发送1次请求,Server返回一段连续的Stream。
例如,Client发送一个股票代码,Server连续发送实时的K线数据。

service Greeter {
rpc GetStream(StreamReq) returns (stream StreamResp)
}

Client-side streaming RPC

与Server-side相反。
例如,Server向Client请求当前室温,物联网终端Client不断向Server发送实时室温。

service Greeter {
rpc GetStream(stream StreamReq) returns (StreamResp)
}

Bidirectional streaming RPC

Client与Server都可以向对方发送数据流,即实时交互。例如Chat Bot。

service Greeter {
rpc GetStream(stream StreamReq) returns (stream StreamResp)
}

MetaData

gRPC和HTTP一样,可以携带一些MetaData

:authority [localhost:port]
content-type [application/grpc]
user-agent [grpc-gp/version]

data [your_data]

Interceptor

interceptorCust := func(ctx context.Context, req Interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp Interface{}, err error) {
fmt.Println("接收到新请求: ", req)
start := time.Now()

res, err := handler(ctx, req)

fmt.Println("请求完成,耗时: ", time.Since(start))
return res, err
}
opt := grpc.UnaryInterceptor(interceptorCust)
g := grpc.NewServer(opt)

Validation

plugin: protoc-gen-validate

message Person {
// id > 999
uint64 id = 1 [(validate.rules).uint64.gt = 999];
// email validation
string email = 2 [(validate.rules).string.email = true];
// custom validation
string name = 3 [(validate.rules).string = {
pattern: "^[0-9]&",
max_bytes: 256,
}];
// not null
Location home = 4 [(validate.rules).message.required = ture];
}

message Location {
// multi-args validation
double lat = 1 [(validate.rules).double = { gte: -90, lte: 90 }];
}
protoc --validate_out="lang=go:."
p := new(Person)
// throw error automatically
err := p.Validate()
if err != nil {
panic(err)
}

搭配拦截器

func interceptor(ctx, req, info, handler) (resp, err) {
if r, ok := req.(Validator); ok {
if err := r.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
return handler(ctx, req)
}