Grpc-01-概述

协议-http2 知道了 grpc 是基于 http2,优化了通信效率,整体流程如下图所示,细节后续在慢慢增加

grpc客户端跟grpc服务器端整个交互过程大概经历了哪些阶段

带着问题看世界:

  1. 基本的使用是怎么样的
  2. 使用的基本原理是什么

基本使用

可参考 grpc-go 源码例子 examples/helloworld/helloworld/

proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package helloworld;

// The greeting service definition.
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHello1 (HelloRequest) returns (stream HelloReply) {}
rpc SayHello2 (stream HelloRequest) returns (HelloReply) {}
rpc SayHello3 (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

消息除了rpc 是否存在其他的定义方法

  • 单一请求
  • 客户端流式请求
  • 服务端流式请求
  • 双向流失请求

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//1. 监听网络端口
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//2. 初始化Grpc服务端
s := grpc.NewServer()
//3. 注册服务端接口
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
//4. 监听请求
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}

可以看出一共经历了四步

  1. 监听TCP连接
  2. 初始化grpc服务端
  3. 注册服务端业务
  4. grpc处理TCP请求

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//1. 建立TCP连接
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
//2. 初始化客户端
c := pb.NewGreeterClient(conn)

// 3. 发送消息并接受响应
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}

可以看出一共经历了三步

  1. 监听TCP连接
  2. 初始化grpc客户端
  3. 发送并接受响应

proto 生成的文件可以发现 helloworld_grpc.pb.go 描述信息

1
2
3
4
5
6
7
8
9
10
11
12
var Greeter_ServiceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "examples/helloworld/helloworld/helloworld.proto",
}

实现架构

运用上述的代码执行一次通信,可以看到通信的效果是

grpc_sayhello

客户端发送

代码路径(带备注,版本 1.45.0-dev):read-grpc-go

发送端的使用流程只有三个步骤

  1. 完成grpc连接
  2. 初始化grpc客户端
  3. 开始发送消息
建立grpc连接

从时序图可以看出,完成grpc一共会分为两个步骤,建立TCP连接,设置帧

  1. 通过解析器解析地址完成
  2. 与解析器的地址建立TCP连接
  3. 设置 Setting 帧
构建grpc client

这里不需要初始化什么,直接将 grpc 连接对象封装到 proto 客户端对象中

1
2
3
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
方法发送

通过proto生成的 一元调用方法类似

1
2
3
4
5
6
7
8
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply) //回复对象
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

核心就是这个 Invoke 函数进行请求调用

1
2
3
4
5
6
7
8
9
10
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}

这里就分为了三步:

  1. 构建一个新的流
  2. 流进行消息发送
  3. 流接收消息

也就是说每次进行单一请求的时候都会使用单独的流进行构建,而 unaryStreamDesc 表示的表示这个流是一个单次调用的流

1
var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false}

服务端接受

代码路径(带备注,版本 1.45.0-dev):read-grpc-go

服务端的使用可以分为四个步骤

  1. 构建网络监听句柄
  2. 构建新grpc server
  3. 将第二步的服务指针注册到proto声明的服务注册函数中
  4. 开始监听服务
构建grpc server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
//...
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}

//...
return s
}

这里有二个地方需要注意的

可选参数
1
2
3
4
//可选参数接口
type ServerOption interface {
apply(*serverOptions)
}

对全局变量进行可选参数应用,这样能够通过在默认的基础上支持外部的自定义应用

1
2
3
4
5
//应用可选参数
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}

当中的 serverOptions 或者说 defaultServerOptions 就是内部默认参数

服务工人

这里会设置服务端的消息消费数量

1
2
3
4
5
6
7
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData) //阻塞隧道
go s.serverWorker(s.serverWorkerChannels[i]) //对应服务端数量与隧道
}
}

每个 serverWorker 都会从ch中读取数据,并再次启动一个 serverWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *Server) serverWorker(ch chan *serverWorkerData) {
//确保不会所有的 work 不会同时 reset,增加一个随机数
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-ch
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done() //记住这个 wg.Done()
}
go s.serverWorker(ch)
}

这个地方与固定的 worker有什么区别吗,为什么要在执行一段时间后,再次拉起一个协程work?

参考:

  1. https://github.com/golang/go/issues/18138
  2. https://github.com/grpc/grpc-go/pull/3204
  3. https://www.zenlife.tk/goroutine-pool.md?hmsr=toutiao.io&utm_campaign=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

这样干的原因是:Goroutine 的默认堆栈大小是 2KB。处理GRPC的时候,至少需要4KB的堆栈。导致每次RPCruntime.morestack:重新分配堆栈信息,并复制更改指针指向。为了避免栈中大量信息每次都需要复制,于是go在运行一段时间后就构建一个新的协程

问题是:

  1. 栈中的信息不是会被垃圾回收吗,为什么会随着栈的增长而都要复制?
    • 垃圾回收可能并不及时,导致执行 runtime.morestack 的时候还是需要将历史堆栈信息复制过去
  2. 如何避免这种情况
    • 使用指针传递参数,减少递归调用
    • 使用协程池
服务注册

服务注册其实是将 proto 服务对象与 grpc server 对象关联到一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock() //写锁
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok { //重复注册没有关系
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
info := &serviceInfo{
serviceImpl: ss, //具体实现服务(proto 服务实现 对象)
methods: make(map[string]*MethodDesc), //指的值 rpc service 的方法
streams: make(map[string]*StreamDesc), //指的是 流 service 的方式
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d //存储 rpc方法
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d //存储 流 方法
}
s.services[sd.ServiceName] = info //存储方法
}
开启服务

开启服务之后,表示已经能接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *Server) Serve(lis net.Listener) error {
//...

var tempDelay time.Duration // how long to sleep on accept failure

for {
rawConn, err := lis.Accept() //可以监听多个链接
if err != nil {
//退避逻辑 一开始 5ms,后续的等待时间 增加2倍,最大值是1s
}
tempDelay = 0
//单独起协程监听
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}

注意点:

  1. 当监听失败,那么会使用退避逻辑 一开始 5ms,然后每次延时时间 增加2倍,最大值是1s
  2. 每个连接都使用单独的协程进行处理

然后在连接成功的链接中,再次使用独立的写成处理每个流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
//...
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) //设置连接超时时间

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{}) //完成连接,那么连接永远不超时了
if st == nil {
return
}

if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st) //使用独立协程处理流(阻塞)
s.removeConn(lisAddr, st)
}()
}

到此就达到了完成连接,开始后续的处理流逻辑

参考文档

  1. https://blog.csdn.net/u011582922/article/details/119834758
  2. https://blog.csdn.net/u011582922/article/details/119944259
  3. https://github.com/grpc/grpc-go/pull/3204
  4. https://www.zenlife.tk/goroutine-pool.md?hmsr=toutiao.io&utm_campaign=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io