共计 2612 个字符,预计需要花费 7 分钟才能阅读完成。
data 节点上的 anti-entropy,会主动查看本节点上短少的 shard,并主动从 peer 节点上 copy 短少的 shard。
//services/anti-entropy/service.go
func (s *Service) Open() error {
if !s.cfg.Enabled {return nil}
// 查看
go s.CheckWorker()
// 修复
go s.ProcessMissingShards(s.ProcessMissingShard)
return nil
}
查看短少的 shard
应用配置文件中的 checkInterval 定期检查:
//services/anti-entropy/service.go
func (s *Service) CheckWorker() {ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval))
for {
select {
case <-s.closing:
return
case <-ticker.C:
s.Check()}
}
}
查找 missShard 的过程:
- 找到所有本机负责的并已不再写入数据的 shard:now() > shard.EndTime;
- 若本机上没有该 shard 信息并且 shard 文件不存在(每个 shard1 个文件),则找到 1 个短少的 shard;
- 查找该 shard 的 peer 节点,并产生 1 个 missShard 发送到 channel,后续将从 peer 中 copy shard 的数据;
//services/anti-entropy/service.go
func (s *Service) Check() {dir := s.svrInfo.Path()
// 不再写入数据的 shard,now() > endTime
shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID())
for db, rp := range shards {
for r, ss := range rp {
for _, sh := range ss {shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10))
_, err := os.Stat(shardDir)
// 本机上短少的 shard
if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) {
// 找 peer
var peerOwners []meta.ShardOwner
// remove my node id
for i, owner := range sh.Owners {if owner.NodeID == s.Node.GetDataID() {peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...)
break
}
}
// 产生 1 个 missShard
if len(sh.Owners) > 1 {s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners}
}
}
}
}
}
}
修复短少的 shard
修复时,先期待一段时间 (11min),避免因 shard 还没写入 tsm file 而产生 shard 短少,而后确认本机上的 shard 文件是否短少,最初调用 s.CopyShard() 执行 shard 复制。
//services/anti-entropy/service.go
func (s *Service) ProcessMissingShard(idpath ShardInfo) {
.......
select {
case <-s.closing:
return
// maybe the shard is not compacted to a tsm file, let's wait first
case <-time.After(s.WaitCompactCacheTime + time.Minute):
// check existence again
if _, err := os.Stat(idpath.Path); os.IsNotExist(err) {if err := s.CopyShard(idpath); err != nil {s.Logger.Info("restore missed shard failed", zap.Uint64("ShardID", idpath.ID), zap.Error(err))
}
......
}
}
return
}
CopyShard()时,peerOwner 作为 src,以后 node 作为 dst,调用 SubmitCopyShard()执行 copy:
//services/anti-entropy/service.go
func (s *Service) CopyShard(sh ShardInfo) error {if len(sh.PeerOwners) == 0 {return fmt.Errorf("ae copy shard %d failed due to no peers", sh.ID)
}
// get my tpc host
nodeinfo, err := s.MetaClient.DataNode(s.Node.GetDataID())
if err != nil {return fmt.Errorf("node %d is removed from cluster", s.Node.GetDataID())
}
dest := nodeinfo.TCPHost
for _, peer := range sh.PeerOwners {nodeinfo, err = s.MetaClient.DataNode(peer.NodeID)
source := nodeinfo.TCPHost
// start new copy
if err = s.MetaClient.SubmitCopyShard(source, dest, sh.ID, s.MaxCopyShardRecords); err != nil {return fmt.Errorf("ae submit copy shard failed: %v", err)
}
return nil
}
// all failed
return fmt.Errorf("copy shard %d to %s failed, all peer nodes are tried", sh.ID, dest)
}
正文完