从协议-http2 知道了 grpc 是基于 http2,优化了通信效率,整体流程如下图所示,细节后续在慢慢增加
带着问题看世界:
基本的使用是怎么样的
使用的基本原理是什么
基本使用
可参考 grpc-go 源码例子 examples/helloworld/helloworld/
proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package helloworld;service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} rpc SayHello1 (HelloRequest) returns (stream HelloReply) {} rpc SayHello2 (stream HelloRequest) returns (HelloReply) {} rpc SayHello3 (stream HelloRequest) returns (stream HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1 ;} message HelloReply { string message = 1; }
消息除了rpc 是否存在其他的定义方法
单一请求
客户端流式请求
服务端流式请求
双向流失请求
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 lis, err := net.Listen("tcp" , fmt.Sprintf(":%d" , *port)) if err != nil { log.Fatalf("failed to listen: %v" , err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v" , lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v" , err) }
可以看出一共经历了四步
监听TCP连接
初始化grpc服务端
注册服务端业务
grpc处理TCP请求
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v" , err) } defer conn.Close()c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name}) if err != nil { log.Fatalf("could not greet: %v" , err) }
可以看出一共经历了三步
监听TCP连接
初始化grpc客户端
发送并接受响应
从 proto
生成的文件可以发现 helloworld_grpc.pb.go
描述信息
1 2 3 4 5 6 7 8 9 10 11 12 var Greeter_ServiceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter" , HandlerType: (*GreeterServer)(nil ), Methods: []grpc.MethodDesc{ { MethodName: "SayHello" , Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "examples/helloworld/helloworld/helloworld.proto" , }
实现架构
运用上述的代码执行一次通信,可以看到通信的效果是
客户端发送
代码路径(带备注,版本 1.45.0-dev):read-grpc-go
发送端的使用流程只有三个步骤
完成grpc连接
初始化grpc客户端
开始发送消息
建立grpc连接
从时序图可以看出,完成grpc一共会分为两个步骤,建立TCP连接,设置帧
通过解析器解析地址完成
与解析器的地址建立TCP连接
设置 Setting 帧
构建grpc client
这里不需要初始化什么,直接将 grpc 连接对象封装到 proto 客户端对象中
1 2 3 func NewGreeterClient (cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} }
方法发送
通过proto生成的 一元调用方法类似
1 2 3 4 5 6 7 8 func (c *greeterClient) SayHello (ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { out := new (HelloReply) err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello" , in, out, opts...) if err != nil { return nil , err } return out, nil }
核心就是这个 Invoke
函数进行请求调用
1 2 3 4 5 6 7 8 9 10 func invoke (ctx context.Context, method string , req, reply interface {}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
这里就分为了三步:
构建一个新的流
流进行消息发送
流接收消息
也就是说每次进行单一请求的时候都会使用单独的流进行构建,而 unaryStreamDesc
表示的表示这个流是一个单次调用的流
1 var unaryStreamDesc = &StreamDesc{ServerStreams: false , ClientStreams: false }
服务端接受
代码路径(带备注,版本 1.45.0-dev):read-grpc-go
服务端的使用可以分为四个步骤
构建网络监听句柄
构建新grpc server
将第二步的服务指针注册到proto声明的服务注册函数中
开始监听服务
构建grpc server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func NewServer (opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o.apply(&opts) } s := &Server{ lis: make (map [net.Listener]bool ), opts: opts, conns: make (map [string ]map [transport.ServerTransport]bool ), services: make (map [string ]*serviceInfo), quit: grpcsync.NewEvent(), done: grpcsync.NewEvent(), czData: new (channelzData), } if s.opts.numServerWorkers > 0 { s.initServerWorkers() } return s }
这里有二个地方需要注意的
可选参数
1 2 3 4 type ServerOption interface { apply(*serverOptions) }
对全局变量进行可选参数应用,这样能够通过在默认的基础上支持外部的自定义应用
1 2 3 4 5 opts := defaultServerOptions for _, o := range opt { o.apply(&opts) }
当中的 serverOptions
或者说 defaultServerOptions
就是内部默认参数
服务工人
这里会设置服务端的消息消费数量
1 2 3 4 5 6 7 func (s *Server) initServerWorkers () { s.serverWorkerChannels = make ([]chan *serverWorkerData, s.opts.numServerWorkers) for i := uint32 (0 ); i < s.opts.numServerWorkers; i++ { s.serverWorkerChannels[i] = make (chan *serverWorkerData) go s.serverWorker(s.serverWorkerChannels[i]) } }
每个 serverWorker 都会从ch中读取数据,并再次启动一个 serverWorker
1 2 3 4 5 6 7 8 9 10 11 12 13 func (s *Server) serverWorker (ch chan *serverWorkerData) { threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) for completed := 0 ; completed < threshold; completed++ { data, ok := <-ch if !ok { return } s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) data.wg.Done() } go s.serverWorker(ch) }
这个地方与固定的 worker有什么区别吗,为什么要在执行一段时间后,再次拉起一个协程work?
参考:
https://github.com/golang/go/issues/18138
https://github.com/grpc/grpc-go/pull/3204
https://www.zenlife.tk/goroutine-pool.md?hmsr=toutiao.io&utm_campaign=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
这样干的原因是:Goroutine
的默认堆栈大小是 2KB。处理GRPC的时候,至少需要4KB的堆栈。导致每次RPCruntime.morestack
:重新分配堆栈信息,并复制更改指针指向。为了避免栈中大量信息每次都需要复制,于是go在运行一段时间后就构建一个新的协程
问题是:
栈中的信息不是会被垃圾回收吗,为什么会随着栈的增长而都要复制?
垃圾回收可能并不及时,导致执行 runtime.morestack 的时候还是需要将历史堆栈信息复制过去
如何避免这种情况
服务注册
服务注册其实是将 proto 服务对象与 grpc server 对象关联到一起
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (s *Server) register (sd *ServiceDesc, ss interface {}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)" , sd.ServiceName) if s.serve { logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q" , sd.ServiceName) } if _, ok := s.services[sd.ServiceName]; ok { logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q" , sd.ServiceName) } info := &serviceInfo{ serviceImpl: ss, methods: make (map [string ]*MethodDesc), streams: make (map [string ]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] info.methods[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] info.streams[d.StreamName] = d } s.services[sd.ServiceName] = info }
开启服务
开启服务之后,表示已经能接受消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *Server) Serve (lis net.Listener) error { var tempDelay time.Duration for { rawConn, err := lis.Accept() if err != nil { } tempDelay = 0 s.serveWG.Add(1 ) go func () { s.handleRawConn(lis.Addr().String(), rawConn) s.serveWG.Done() }() } }
注意点:
当监听失败,那么会使用退避逻辑 一开始 5ms,然后每次延时时间 增加2倍,最大值是1s
每个连接都使用单独的协程进行处理
然后在连接成功的链接中,再次使用独立的写成处理每个流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *Server) handleRawConn (lisAddr string , rawConn net.Conn) { rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) st := s.newHTTP2Transport(rawConn) rawConn.SetDeadline(time.Time{}) if st == nil { return } if !s.addConn(lisAddr, st) { return } go func () { s.serveStreams(st) s.removeConn(lisAddr, st) }() }
到此就达到了完成连接,开始后续的处理流逻辑
参考文档
https://blog.csdn.net/u011582922/article/details/119834758
https://blog.csdn.net/u011582922/article/details/119944259
https://github.com/grpc/grpc-go/pull/3204
https://www.zenlife.tk/goroutine-pool.md?hmsr=toutiao.io&utm_campaign=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io