问题背景
之前云平台是基于MQ实现的通信机制,后来需要将通信方式切换为Grpc但又不想修改老的接口和自定义逻辑。所以就需要修改原本的库。因为内部包含了配置加载、MQ适配、Grpc功能、服务注册发现、链路追踪、自定义功能(限速、大包处理、调试、过滤、统计、存储…)。这些子服务是相互独立且可选(不启用也可以),部分存在依赖关系。所以就想着使用服务管理的方式管理这些小功能,并按照期望的方式进行启动运行
实现方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package main
import "context"
type ServerNum uint8
const ( DemoA ServerNum = iota DemoB MaxServerNum )
var serverList = []Server{ DemoA: &Server1{}, DemoB: &Server2{}, }
type Server interface { Start(ctx context.Context) (Server, error) ID() int Close() error IsOpen() bool }
func Start(ctx context.Context) { for _, v := range serverList { v.Start(ctx) } }
type Manager struct {
}
func (m *Manager)Start(ctx context.Context, s Server) error { if s == nil || !s.IsOpen() { return nil } v, err := s.Start(ctx) if err != nil { panic(err) }
key := v.ID() return nil }
func (m *Manager)Stop(ctx context.Context) { for i := MaxServerNum - 1; i >=0; i++ { v := serverList[i] if v != nil { v.Close() } } }
|
- 单个服务内的子模块是否相互依赖的,所以这里使用切片进行存储,关闭时按照启动顺序的反方向
- 构建
IsOpen
方便根据配置加载服务需要的模块
- 服务句柄的存储暂时保存,需要使用服务句柄也许直接使用大写暴露会更好,或者使用不需要句柄的外部函数
- 注意服务的常驻等待(例如HTTP服务就是Listen之后不做后续处理,这个时候就需要使用协程拉起),而所有服务加载完成之后,需要有一个常驻等待逻辑
1 2 3 4 5 6 7 8
| func Loop() { exit := make(chan os.Signal, 1) signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM) select { case sig := <- exit: log.Infof(context.Background(), "recv signal %s", sig.String()) } }
|
技术内幕
代码: core/service/servicegroup.go
1 2 3 4 5 6 7 8 9 10 11
| import "github.com/zeromicro/go-zero/core/service"
func main() { group := service.NewServiceGroup() defer group.Stop() group.Add(Morning{}) group.Add(Evening{}) group.Start() }
|
数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type ( Starter interface { Start() } Stopper interface { Stop() }
Service interface { Starter Stopper }
ServiceGroup struct { services []Service stopOnce func() //停止一次接口 } )
|
初始化对象
1 2 3 4 5 6
| func NewServiceGroup() *ServiceGroup { sg := new(ServiceGroup) sg.stopOnce = syncx.Once(sg.doStop) return sg }
|
增加对象
1 2 3 4 5
| func (sg *ServiceGroup) Add(service Service) { sg.services = append([]Service{service}, sg.services...) }
|
开启服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (sg *ServiceGroup) Start() { proc.AddShutdownListener(func() { log.Println("Shutting down...") sg.stopOnce() })
sg.doStart() }
func (sg *ServiceGroup) doStart() { routineGroup := threading.NewRoutineGroup()
for i := range sg.services { service := sg.services[i] routineGroup.RunSafe(func() { service.Start() }) }
routineGroup.Wait() }
|
- 使用协程池进行每个服务的启动,所以每个服务启动的顺序是不一定的
- 使用RunSafe进行服务启动,所以一个服务panic,另外的服务也能启动
服务关闭
1 2 3 4 5 6 7 8 9 10 11
| func (sg *ServiceGroup) Stop() {NewCache sg.stopOnce() }
func (sg *ServiceGroup) doStop() { for _, service := range sg.services { service.Stop() } }
|
仅有开启服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func WithStart(start func()) Service { return startOnlyService{ start: start, } }
func WithStarter(start Starter) Service { return starterOnlyService{ Starter: start, } }
type ( stopper struct{}
startOnlyService struct { start func() stopper }
starterOnlyService struct { Starter stopper } )
func (s stopper) Stop() { }
func (s startOnlyService) Start() { s.start() }
|
总结
- 由于子服务无法保证启动顺序,所以各服务之间不能有相互依赖或关联
- 使用
stopOnce
防止出现多次调用stop
的情况
- 使用
proc.AddShutdownListener
做退出监听
优化:
- 如果能控制服务启动和调用顺序是否会更好(一个服务内的多个子模块必然存在先后启动关系),那么停止服务的时候也注意先后关系
- 如果使用协程池拉起服务,那么单个服务的异常无法让所有子服务都退出。缺少一个同步退出机制
- 使用
startOnlyService
是否有点多余,在服务停止中不做任何处理,那么退出时也能正常退出
参考文档
- https://mp.weixin.qq.com/s/G6WG_-C6d-raoRmH4hBjoQ