data节点上的anti-entropy,会主动查看本节点上短少的shard,并主动从peer节点上copy短少的shard。
//services/anti-entropy/service.gofunc (s *Service) Open() error { if !s.cfg.Enabled { return nil } //查看 go s.CheckWorker() //修复 go s.ProcessMissingShards(s.ProcessMissingShard) return nil}
查看短少的shard
应用配置文件中的checkInterval定期检查:
//services/anti-entropy/service.gofunc (s *Service) CheckWorker() { ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval)) for { select { case <-s.closing: return case <-ticker.C: s.Check() } }}
查找missShard的过程:
- 找到所有本机负责的并已不再写入数据的shard:now() > shard.EndTime;
- 若本机上没有该shard信息并且shard文件不存在(每个shard1个文件),则找到1个短少的shard;
- 查找该shard的peer节点,并产生1个missShard发送到channel,后续将从peer中copy shard的数据;
//services/anti-entropy/service.gofunc (s *Service) Check() { dir := s.svrInfo.Path() //不再写入数据的shard,now() > endTime shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID()) for db, rp := range shards { for r, ss := range rp { for _, sh := range ss { shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10)) _, err := os.Stat(shardDir) //本机上短少的shard if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) { //找peer var peerOwners []meta.ShardOwner // remove my node id for i, owner := range sh.Owners { if owner.NodeID == s.Node.GetDataID() { peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...) break } } //产生1个missShard if len(sh.Owners) > 1 { s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners} } } } } }}
修复短少的shard
修复时,先期待一段时间(11min),避免因shard还没写入tsm file而产生shard短少,而后确认本机上的shard文件是否短少,最初调用s.CopyShard()执行shard复制。
//services/anti-entropy/service.gofunc (s *Service) ProcessMissingShard(idpath ShardInfo) { ....... select { case <-s.closing: return // maybe the shard is not compacted to a tsm file, let's wait first case <-time.After(s.WaitCompactCacheTime + time.Minute): // check existence again if _, err := os.Stat(idpath.Path); os.IsNotExist(err) { if err := s.CopyShard(idpath); err != nil { s.Logger.Info("restore missed shard failed", zap.Uint64("ShardID", idpath.ID), zap.Error(err)) } ...... } } return}
CopyShard()时,peerOwner作为src,以后node作为dst,调用SubmitCopyShard()执行copy:
//services/anti-entropy/service.gofunc (s *Service) CopyShard(sh ShardInfo) error { if len(sh.PeerOwners) == 0 { return fmt.Errorf("ae copy shard %d failed due to no peers", sh.ID) } // get my tpc host nodeinfo, err := s.MetaClient.DataNode(s.Node.GetDataID()) if err != nil { return fmt.Errorf("node %d is removed from cluster", s.Node.GetDataID()) } dest := nodeinfo.TCPHost for _, peer := range sh.PeerOwners { nodeinfo, err = s.MetaClient.DataNode(peer.NodeID) source := nodeinfo.TCPHost // start new copy if err = s.MetaClient.SubmitCopyShard(source, dest, sh.ID, s.MaxCopyShardRecords); err != nil { return fmt.Errorf("ae submit copy shard failed: %v", err) } return nil } // all failed return fmt.Errorf("copy shard %d to %s failed, all peer nodes are tried", sh.ID, dest)}