data节点上的anti-entropy,会主动查看本节点上短少的shard,并主动从peer节点上copy短少的shard。

//services/anti-entropy/service.gofunc (s *Service) Open() error {    if !s.cfg.Enabled {        return nil    }        //查看    go s.CheckWorker()    //修复    go s.ProcessMissingShards(s.ProcessMissingShard)    return nil}

查看短少的shard

应用配置文件中的checkInterval定期检查:

//services/anti-entropy/service.gofunc (s *Service) CheckWorker() {        ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval))    for {        select {        case <-s.closing:            return        case <-ticker.C:            s.Check()        }    }}

查找missShard的过程:

  • 找到所有本机负责的并已不再写入数据的shard:now() > shard.EndTime;
  • 若本机上没有该shard信息并且shard文件不存在(每个shard1个文件),则找到1个短少的shard;
  • 查找该shard的peer节点,并产生1个missShard发送到channel,后续将从peer中copy shard的数据;
//services/anti-entropy/service.gofunc (s *Service) Check() {    dir := s.svrInfo.Path()    //不再写入数据的shard,now() > endTime    shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID())    for db, rp := range shards {        for r, ss := range rp {            for _, sh := range ss {                shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10))                _, err := os.Stat(shardDir)                //本机上短少的shard                if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) {                    //找peer                    var peerOwners []meta.ShardOwner                    // remove my node id                    for i, owner := range sh.Owners {                        if owner.NodeID == s.Node.GetDataID() {                            peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...)                            break                        }                    }                    //产生1个missShard                    if len(sh.Owners) > 1 {                        s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners}                    }                }            }        }    }}

修复短少的shard

修复时,先期待一段时间(11min),避免因shard还没写入tsm file而产生shard短少,而后确认本机上的shard文件是否短少,最初调用s.CopyShard()执行shard复制。

//services/anti-entropy/service.gofunc (s *Service) ProcessMissingShard(idpath ShardInfo) {    .......    select {    case <-s.closing:        return    // maybe the shard is not compacted to a tsm file, let's wait first    case <-time.After(s.WaitCompactCacheTime + time.Minute):                // check existence again        if _, err := os.Stat(idpath.Path); os.IsNotExist(err) {            if err := s.CopyShard(idpath); err != nil {                s.Logger.Info("restore missed shard failed", zap.Uint64("ShardID", idpath.ID), zap.Error(err))            }            ......                        }    }    return}

CopyShard()时,peerOwner作为src,以后node作为dst,调用SubmitCopyShard()执行copy:

//services/anti-entropy/service.gofunc (s *Service) CopyShard(sh ShardInfo) error {    if len(sh.PeerOwners) == 0 {        return fmt.Errorf("ae copy shard %d failed due to no peers", sh.ID)    }    // get my tpc host    nodeinfo, err := s.MetaClient.DataNode(s.Node.GetDataID())    if err != nil {        return fmt.Errorf("node %d is removed from cluster", s.Node.GetDataID())    }    dest := nodeinfo.TCPHost    for _, peer := range sh.PeerOwners {        nodeinfo, err = s.MetaClient.DataNode(peer.NodeID)        source := nodeinfo.TCPHost        // start new copy        if err = s.MetaClient.SubmitCopyShard(source, dest, sh.ID, s.MaxCopyShardRecords); err != nil {            return fmt.Errorf("ae submit copy shard failed: %v", err)        }        return nil    }    // all failed    return fmt.Errorf("copy shard %d to %s failed, all peer nodes are tried", sh.ID, dest)}