micro.newService()中newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {
o(&opt)
}
return opt
}
初始化了一堆根底设置,来看看configconfig.DefaultConfig,
在config/config.go中
var (
// Default Config Manager
DefaultConfig, _ = NewConfig()
)
// NewConfig returns new config
func NewConfig(opts ...Option) (Config, error) {
return newConfig(opts...)
}
func newConfig(opts ...Option) (Config, error) {
var c config
c.Init(opts...)
go c.run()
return &c, nil
}
func (c *config) Init(opts ...Option) error {
c.opts = Options{
Reader: json.NewReader(),
}
c.exit = make(chan bool)
for _, o := range opts {
o(&c.opts)
}
// default loader uses the configured reader
if c.opts.Loader == nil {
c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
}
err := c.opts.Loader.Load(c.opts.Source...)
if err != nil {
return err
}
c.snap, err = c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
if err != nil {
return err
}
return nil
}
func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}
c.Lock()
if c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save
c.snap = snap
// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
c.Unlock()
}
}
for {
w, err := c.opts.Loader.Watch()
if err != nil {
time.Sleep(time.Second)
continue
}
done := make(chan bool)
// the stop watch func
go func() {
select {
case <-done:
case <-c.exit:
}
w.Stop()
}()
// block watch
if err := watch(w); err != nil {
// do something better
time.Sleep(time.Second)
}
// close done chan
close(done)
// if the config is closed exit
select {
case <-c.exit:
return
default:
}
}
}
看看Init()
左做了什么
- 初始化并设置opts,创立exit用于监听退出信号,设置opts
-
设置默认loader,c.opts.Loader默认是memory
memory.NewLoader()
[config/loader/memory/memory.go]- 初始化并设置opts,蕴含Reader[默认json]
- 初始化memory{}
-
设置m.sets,并
watch()
每个options.Source,看看watch()
做了什么-
定义watch()函数
-
调用watcher.Next(),上面看看next()做了什么
- 定义update()函数,返回loader.Snapshot{}
- 监听watcher.exit,watcher.updates信号,有更新时且版本更新时,调用下面的update()函数,更新watcher.value并返回loader.Snapshot{}
- 保留m.sets[idx],值为loader.Snapshot{}
- 合并所有m.sets
- 读取所有值到m.vals,保留快照到m.snap
-
调用update()
- 获取所有watcher,如果版本有更新,则发送watcher.updates信号
-
-
调用Watch()函数返回watcher,留神W是大写,调用的是memory.Watch()
- 调用Get(),返回m.vals.Get(path…)
- 初始化watcher,并增加到m.watchers【双向链表】
- 开协程,监听watcher.exit信号,收到信号从watchers中移除以后watcher
- 开协程,监听实现信号done和exit信号,收到信号后执行Stop(),敞开exit,updates这2个channel
- 调用下面定义的watch()
- 敞开done channel,监听m.exit信号
-
-
调用c.opts.Loader.Load()
- 循环所有source,更新m.sources,m.sets,并watch()所有source
-
调用
reload()
- 合并所有sets
- 设置m.vals,m.snap
- 调用m.update()
-
调用
c.opts.Loader.Snapshot()
- 如曾经load,间接复制一份并返回m.snap
- 没载入就调用
Sync()
同步配置 - 复制一份m.snap返回
- 调用
c.opts.Reader.Values()
,赋值config.vals【reader.Values
类型】
绕来绕去,终于完了。
主流程其实并不简单,次要是波及到watch更新,所以比拟绕。
config这块其实是比拟独立的包,能够在其余我的项目中援用。
发表回复