关于influxdb:InfluxDB集群-antientropy源码分析

4次阅读

共计 2612 个字符,预计需要花费 7 分钟才能阅读完成。

data 节点上的 anti-entropy,会主动查看本节点上短少的 shard,并主动从 peer 节点上 copy 短少的 shard。

//services/anti-entropy/service.go
func (s *Service) Open() error {
    if !s.cfg.Enabled {return nil}    
    // 查看
    go s.CheckWorker()
    // 修复
    go s.ProcessMissingShards(s.ProcessMissingShard)
    return nil
}

查看短少的 shard

应用配置文件中的 checkInterval 定期检查:

//services/anti-entropy/service.go
func (s *Service) CheckWorker() {ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval))
    for {
        select {
        case <-s.closing:
            return
        case <-ticker.C:
            s.Check()}
    }
}

查找 missShard 的过程:

  • 找到所有本机负责的并已不再写入数据的 shard:now() > shard.EndTime;
  • 若本机上没有该 shard 信息并且 shard 文件不存在(每个 shard1 个文件),则找到 1 个短少的 shard;
  • 查找该 shard 的 peer 节点,并产生 1 个 missShard 发送到 channel,后续将从 peer 中 copy shard 的数据;
//services/anti-entropy/service.go
func (s *Service) Check() {dir := s.svrInfo.Path()
    // 不再写入数据的 shard,now() > endTime
    shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID())
    for db, rp := range shards {
        for r, ss := range rp {
            for _, sh := range ss {shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10))
                _, err := os.Stat(shardDir)
                // 本机上短少的 shard
                if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) {
                    // 找 peer
                    var peerOwners []meta.ShardOwner
                    // remove my node id
                    for i, owner := range sh.Owners {if owner.NodeID == s.Node.GetDataID() {peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...)
                            break
                        }
                    }
                    // 产生 1 个 missShard
                    if len(sh.Owners) > 1 {s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners}
                    }
                }
            }
        }
    }
}

修复短少的 shard

修复时,先期待一段时间 (11min),避免因 shard 还没写入 tsm file 而产生 shard 短少,而后确认本机上的 shard 文件是否短少,最初调用 s.CopyShard() 执行 shard 复制。

//services/anti-entropy/service.go
func (s *Service) ProcessMissingShard(idpath ShardInfo) {
    .......
    select {
    case <-s.closing:
        return
    // maybe the shard is not compacted to a tsm file, let's wait first
    case <-time.After(s.WaitCompactCacheTime + time.Minute):        
        // check existence again
        if _, err := os.Stat(idpath.Path); os.IsNotExist(err) {if err := s.CopyShard(idpath); err != nil {s.Logger.Info("restore missed shard failed", zap.Uint64("ShardID", idpath.ID), zap.Error(err))
            }
            ......                
        }
    }
    return
}

CopyShard()时,peerOwner 作为 src,以后 node 作为 dst,调用 SubmitCopyShard()执行 copy:

//services/anti-entropy/service.go
func (s *Service) CopyShard(sh ShardInfo) error {if len(sh.PeerOwners) == 0 {return fmt.Errorf("ae copy shard %d failed due to no peers", sh.ID)
    }
    // get my tpc host
    nodeinfo, err := s.MetaClient.DataNode(s.Node.GetDataID())
    if err != nil {return fmt.Errorf("node %d is removed from cluster", s.Node.GetDataID())
    }
    dest := nodeinfo.TCPHost

    for _, peer := range sh.PeerOwners {nodeinfo, err = s.MetaClient.DataNode(peer.NodeID)
        source := nodeinfo.TCPHost
        // start new copy
        if err = s.MetaClient.SubmitCopyShard(source, dest, sh.ID, s.MaxCopyShardRecords); err != nil {return fmt.Errorf("ae submit copy shard failed: %v", err)
        }
        return nil
    }
    // all failed
    return fmt.Errorf("copy shard %d to %s failed, all peer nodes are tried", sh.ID, dest)
}
正文完
 0