概述
在 Go Micro 框架中,Server 是对 Broker、Register、Codec、Transort 等服务的一个封装,从下图中就可以看到。
再看一下 Server 定义的接口
- Init:初始化
- Handler:注册 rpchandler
- NewHandler:封装 rpchandler
- NewSubscriber:封装 Subscriber 给 Subscribe 方法
- Subscribe:// 注入订阅事件
- Start:启动服务、监听端口、处理请求
- Stop:停止服务,broker 等资源关闭
- String
type Server interface {Options() Options
Init(...Option) error
Handle(Handler) error
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Start() error
Stop() error
String() string
主要方法 源码 以 rpcServer 为例
Init
server 的初始化方法,初始化 options 之后 利用 once 函数初始化了 Cmd
func (s *service) Init(opts ...Option) {
// process options
for _, o := range opts {o(&s.opts)
}
s.once.Do(func() {
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
})
}
Handle
调用 router 的 handler 方法把方法注册到 server 中
func (s *rpcServer) Handle(h Handler) error {s.Lock()
defer s.Unlock()
if err := s.router.Handle(h); err != nil {return err}
s.handlers[h.Name()] = h
return nil
}
NewSubscriber
订阅某个主题,调用 subscriber 中的 newSubscriber
newSubscriber 函数我们在 Go Micro Broker 源码分析文章中已经分析过这里就不展开了
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {return newSubscriber(topic, sb, opts...)
}
Subscribe
Subscribe 函数接受一个 Subscriber 的接口,下面可以看到接口的定义
Subscribe 函数接受到接口之后保存到 rpcServer 中的 subscribers map 中
在 Deregister、Register、Subscribe 方法中调用注册的 Subscriber() Unsubscribe()函数
type Subscriber interface {Topic() string
Subscriber() interface{}
Endpoints() []*registry.Endpoint
Options() SubscriberOptions}
// broker.go
type Subscriber interface {Options() SubscribeOptions
Topic() string
Unsubscribe() error}
func (s *rpcServer) Subscribe(sb Subscriber) error {sub, ok := sb.(*subscriber)
if !ok {return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := validateSubscriber(sb); err != nil {return err}
s.Lock()
defer s.Unlock()
_, ok = s.subscribers[sub]
if ok {return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
return nil
}
Start
start 函数是 server 中最重要的函数,在 service run 的时候回调用 server 的 start 方法来正式开启这个服务。
代码如下
在 Go Micro Register 源码分析这篇文章中已经分析过 Start 函数的中对于服务发现注册的项管代码这边就不重复了。主要来看一下的是 监听端口、处理清酒、和初始化 broker
,总体来说就是实现了监听端口 / 处理请求 / 初始化 Broker。
func (s *rpcServer) Start() error {registerDebugHandler(s)
config := s.Options()
// 开始监听 transport 处理客户端接收到的请求
ts, err := config.Transport.Listen(config.Address)
if err != nil {return err}
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
// swap address
s.Lock()
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
// 连接 broker 用于订阅服务
if err := config.Broker.Connect(); err != nil {return err}
bname := config.Broker.String()
log.Logf("Broker [%s] Connected to %s", bname, config.Broker.Address())
// 调用 RegisterCheck 检查注册服务是否可用 可从外部注入函数
if err = s.opts.RegisterCheck(s.opts.Context); err != nil {log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
} else {
// 注册服务
if err = s.Register(); err != nil {log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
exit := make(chan bool)
// 处理上面监听的 Transport.Listen listerer
go func() {
for {
// 监听到请求处理请求代码 在 Transport 会深入详细说明
err := ts.Accept(s.ServeConn)
select {
// check if we're supposed to exit
case <-exit:
return
// check the error and backoff
default:
if err != nil {log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
}
// no error just exit
return
}
}()
// 开启 goroutine 检查服务是否可用,间隔时间为设置的 RegisterInterval
go func() {t := new(time.Ticker)
// only process if it exists
if s.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(s.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
s.RLock()
registered := s.registered
s.RUnlock()
// 调用 Register 组件中的 RegisterCheck 方法 测试服务是否正常
if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered {log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
// deregister self in case of error
if err := s.Deregister(); err != nil {log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
} else {if err := s.Register(); err != nil {log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
// wait for requests to finish
if s.wg != nil {s.wg.Wait()
}
// close transport listener
ch <- ts.Close()
// disconnect the broker
config.Broker.Disconnect()
// swap back address
s.Lock()
s.opts.Address = addr
s.Unlock()}()
return nil
}
总结
server 是对于底层一些方法的封装,比如实现服务的开启、关闭、节点的注册和订阅的注册。同样级别的封装在最上面图可以看到还有 CLient,然而在 Client 和 Server 上层还有 service,这些会在另外的地方再分析。