乐趣区

聊聊Cassandra的FailureDetector

本文主要研究一下 Cassandra 的 FailureDetector

IFailureDetector

cassandra-3.11.4/src/java/org/apache/cassandra/gms/IFailureDetector.java

public interface IFailureDetector
{
    /**
     * Failure Detector's knowledge of whether a node is up or
     * down.
     *
     * @param ep endpoint in question.
     * @return true if UP and false if DOWN.
     */
    public boolean isAlive(InetAddress ep);

    /**
     * This method is invoked by any entity wanting to interrogate the status of an endpoint.
     * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and
     * deem an endpoint as suspicious or alive as explained in the Hayashibara paper.
     *
     * param ep endpoint for which we interpret the inter arrival times.
     */
    public void interpret(InetAddress ep);

    /**
     * This method is invoked by the receiver of the heartbeat. In our case it would be
     * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The
     * FailureDetector will then sample the arrival time as explained in the paper.
     *
     * param ep endpoint being reported.
     */
    public void report(InetAddress ep);

    /**
     * remove endpoint from failure detector
     */
    public void remove(InetAddress ep);

    /**
     * force conviction of endpoint in the failure detector
     */
    public void forceConviction(InetAddress ep);

    /**
     * Register interest for Failure Detector events.
     *
     * @param listener implementation of an application provided IFailureDetectionEventListener
     */
    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener);

    /**
     * Un-register interest for Failure Detector events.
     *
     * @param listener implementation of an application provided IFailureDetectionEventListener
     */
    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);
}
  • IFailureDetector 接口定义了 isAlive、interpret、report、forceConviction、registerFailureDetectionEventListener、unregisterFailureDetectionEventListener 方法

FailureDetector

cassandra-3.11.4/src/java/org/apache/cassandra/gms/FailureDetector.java

/**
 * This FailureDetector is an implementation of the paper titled
 * "The Phi Accrual Failure Detector" by Hayashibara.
 * Check the paper and the <i>IFailureDetector</i> interface for details.
 */
public class FailureDetector implements IFailureDetector, FailureDetectorMBean
{private static final Logger logger = LoggerFactory.getLogger(FailureDetector.class);
    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=FailureDetector";
    private static final int SAMPLE_SIZE = 1000;
    protected static final long INITIAL_VALUE_NANOS = TimeUnit.NANOSECONDS.convert(getInitialValue(), TimeUnit.MILLISECONDS);
    private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message
    private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds
    private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause();
    private long lastInterpret = Clock.instance.nanoTime();
    private long lastPause = 0L;

    private static long getMaxLocalPause()
    {if (System.getProperty("cassandra.max_local_pause_in_ms") != null)
        {long pause = Long.parseLong(System.getProperty("cassandra.max_local_pause_in_ms"));
            logger.warn("Overriding max local pause time to {}ms", pause);
            return pause * 1000000L;
        }
        else
            return DEFAULT_MAX_PAUSE;
    }

    public static final IFailureDetector instance = new FailureDetector();

    // this is useless except to provide backwards compatibility in phi_convict_threshold,
    // because everyone seems pretty accustomed to the default of 8, and users who have
    // already tuned their phi_convict_threshold for their own environments won't need to
    // change.
    private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434...

    private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples = new ConcurrentHashMap<>();
    private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<>();

    //......

    public boolean isAlive(InetAddress ep)
    {if (ep.equals(FBUtilities.getBroadcastAddress()))
            return true;

        EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
        // we could assert not-null, but having isAlive fail screws a node over so badly that
        // it's worth being defensive here so minor bugs don't cause disproportionate
        // badness.  (See CASSANDRA-1463 for an example).
        if (epState == null)
            logger.error("Unknown endpoint:" + ep, new IllegalArgumentException(""));
        return epState != null && epState.isAlive();}

    public void interpret(InetAddress ep)
    {ArrivalWindow hbWnd = arrivalSamples.get(ep);
        if (hbWnd == null)
        {return;}
        long now = Clock.instance.nanoTime();
        long diff = now - lastInterpret;
        lastInterpret = now;
        if (diff > MAX_LOCAL_PAUSE_IN_NANOS)
        {logger.warn("Not marking nodes down due to local pause of {} > {}", diff, MAX_LOCAL_PAUSE_IN_NANOS);
            lastPause = now;
            return;
        }
        if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
        {logger.debug("Still not marking nodes down due to local pause");
            return;
        }
        double phi = hbWnd.phi(now);
        if (logger.isTraceEnabled())
            logger.trace("PHI for {} : {}", ep, phi);

        if (PHI_FACTOR * phi > getPhiConvictThreshold())
        {if (logger.isTraceEnabled())
                logger.trace("Node {} phi {} > {}; intervals: {} mean: {}", new Object[]{ep, PHI_FACTOR * phi, getPhiConvictThreshold(), hbWnd, hbWnd.mean()});
            for (IFailureDetectionEventListener listener : fdEvntListeners)
            {listener.convict(ep, phi);
            }
        }
        else if (logger.isDebugEnabled() && (PHI_FACTOR * phi * DEBUG_PERCENTAGE / 100.0 > getPhiConvictThreshold()))
        {logger.debug("PHI for {} : {}", ep, phi);
        }
        else if (logger.isTraceEnabled())
        {logger.trace("PHI for {} : {}", ep, phi);
            logger.trace("mean for {} : {}", ep, hbWnd.mean());
        }
    }

    //......
}
  • FailureDetector 实现了 IFailureDetector, FailureDetectorMBean 接口
  • 这里定义的 PHI_FACTOR 为 1.0 / Math.log(10.0),而 phiConvictThreshold 默认为 8;这里维护了 arrivalSamples,即 InetAddress 及其 ArrivalWindow 的映射
  • 其 isAlive 方法取的 epState.isAlive() 的值;其 interpret 方法调用 ArrivalWindow.phi 计算 now 值的 phi,然后乘以 PHI_FACTOR,如果大于 phiConvictThreshold 则会回调 IFailureDetectionEventListener 的 convict 方法

EndpointState

cassandra-3.11.4/src/java/org/apache/cassandra/gms/EndpointState.java

public class EndpointState
{protected static final Logger logger = LoggerFactory.getLogger(EndpointState.class);

    public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();

    private volatile HeartBeatState hbState;
    private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;

    /* fields below do not get serialized */
    private volatile long updateTimestamp;
    private volatile boolean isAlive;

    public boolean isAlive()
    {return isAlive;}

    void markAlive()
    {isAlive = true;}

    void markDead()
    {isAlive = false;}

    //......
}
  • EndpointState 的 isAlive 返回的是 isAlive 值,则 markDead 方法则会标记该值为 false

ArrivalWindow

cassandra-3.11.4/src/java/org/apache/cassandra/gms/FailureDetector.java

class ArrivalWindow
{private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class);
    private long tLast = 0L;
    private final ArrayBackedBoundedStats arrivalIntervals;
    private double lastReportedPhi = Double.MIN_VALUE;

    // in the event of a long partition, never record an interval longer than the rpc timeout,
    // since if a host is regularly experiencing connectivity problems lasting this long we'd
    // rather mark it down quickly instead of adapting
    // this value defaults to the same initial value the FD is seeded with
    private final long MAX_INTERVAL_IN_NANO = getMaxInterval();

    ArrivalWindow(int size)
    {arrivalIntervals = new ArrayBackedBoundedStats(size);
    }

    private static long getMaxInterval()
    {String newvalue = System.getProperty("cassandra.fd_max_interval_ms");
        if (newvalue == null)
        {return FailureDetector.INITIAL_VALUE_NANOS;}
        else
        {logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue);
            return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS);
        }
    }

    synchronized void add(long value, InetAddress ep)
    {
        assert tLast >= 0;
        if (tLast > 0L)
        {long interArrivalTime = (value - tLast);
            if (interArrivalTime <= MAX_INTERVAL_IN_NANO)
            {arrivalIntervals.add(interArrivalTime);
                logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep);
            }
            else
            {logger.trace("Ignoring interval time of {} for {}", interArrivalTime, ep);
            }
        }
        else
        {
            // We use a very large initial interval since the "right" average depends on the cluster size
            // and it's better to err high (false negatives, which will be corrected by waiting a bit longer)
            // than low (false positives, which cause "flapping").
            arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS);
        }
        tLast = value;
    }

    double mean()
    {return arrivalIntervals.mean();
    }

    // see CASSANDRA-2597 for an explanation of the math at work here.
    double phi(long tnow)
    {assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive
        long t = tnow - tLast;
        lastReportedPhi = t / mean();
        return lastReportedPhi;
    }

    double getLastReportedPhi()
    {return lastReportedPhi;}

    public String toString()
    {return Arrays.toString(arrivalIntervals.getArrivalIntervals());
    }
}
  • ArrivalWindow 使用 ArrayBackedBoundedStats 来存储 arrivalIntervals 值
  • 其 add 方法是一个 synchronized 方法,它在 tLast 大于 0 且 interArrivalTime 小于等于 MAX_INTERVAL_IN_NANO 的时候才会执行 arrivalIntervals.add(interArrivalTime),如果 tLast 小于等于 0 则执行 arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS)
  • phi 值采用了 exponential distribution appropriate,即通过 t / mean() 来近似计算 P(x <= t)

Although the original paper suggests that the distribution is approximated by the Gaussian distribution we found the Exponential Distribution to be a better approximation, because of the nature of the gossip channel and its impact on latency
Regular message transmissions experiencing typical random jitter will follow a normal distribution, but since gossip messages from endpoint A to endpoint B are sent at random intervals, they likely make up a Poisson process, making the exponential distribution appropriate.

Gossiper

cassandra-3.11.4/src/java/org/apache/cassandra/gms/Gossiper.java

public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
    //......

    /**
     * This method is part of IFailureDetectionEventListener interface. This is invoked
     * by the Failure Detector when it convicts an end point.
     *
     * @param endpoint end point that is convicted.
     */
    public void convict(InetAddress endpoint, double phi)
    {EndpointState epState = endpointStateMap.get(endpoint);
        if (epState == null)
            return;

        if (!epState.isAlive())
            return;

        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());


        if (isShutdown(endpoint))
        {markAsShutdown(endpoint);
        }
        else
        {markDead(endpoint, epState);
        }
    }

    /**
     * This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
     * @param endpoint endpoint that has shut itself down
     */
    protected void markAsShutdown(InetAddress endpoint)
    {EndpointState epState = endpointStateMap.get(endpoint);
        if (epState == null)
            return;
        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
        epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
        epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
        markDead(endpoint, epState);
        FailureDetector.instance.forceConviction(endpoint);
    }

    @VisibleForTesting
    public void markDead(InetAddress addr, EndpointState localState)
    {if (logger.isTraceEnabled())
            logger.trace("marking as down {}", addr);
        localState.markDead();
        liveEndpoints.remove(addr);
        unreachableEndpoints.put(addr, System.nanoTime());
        logger.info("InetAddress {} is now DOWN", addr);
        for (IEndpointStateChangeSubscriber subscriber : subscribers)
            subscriber.onDead(addr, localState);
        if (logger.isTraceEnabled())
            logger.trace("Notified {}", subscribers);
    }

    //......
}
  • Gossiper 实现了 IFailureDetectionEventListener 接口,其 convict 方法会获取 endpointState,如果已经 shutdown 则执行 markAsShutdown 方法,否则执行 markDead 方法
  • markAsShutdown 方法会调用 markDead 方法,然后再调用 FailureDetector.instance.forceConviction(endpoint) 方法
  • markDead 方法则直接调用 endpointState.markDead() 方法,然后回调 IEndpointStateChangeSubscriber 的 onDead 方法

GossipTask

cassandra-3.11.4/src/java/org/apache/cassandra/gms/Gossiper.java

public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
    //......

    public void start(int generationNumber)
    {start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
    }

    /**
     * Start the gossiper with the generation number, preloading the map of application states before starting
     */
    public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
    {buildSeedsList();
        /* initialize the heartbeat state for this localEndpoint */
        maybeInitializeLocalState(generationNbr);
        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
        localState.addApplicationStates(preloadLocalStates);

        //notify snitches that Gossiper is about to start
        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
        if (logger.isTraceEnabled())
            logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());

        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
                                                              Gossiper.intervalInMillis,
                                                              Gossiper.intervalInMillis,
                                                              TimeUnit.MILLISECONDS);
    }

    private class GossipTask implements Runnable
    {public void run()
        {
            try
            {
                //wait on messaging service to start listening
                MessagingService.instance().waitUntilListening();

                taskLock.lock();

                /* Update the local heartbeat counter. */
                endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
                if (logger.isTraceEnabled())
                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                Gossiper.instance.makeRandomGossipDigest(gDigests);

                if (gDigests.size() > 0)
                {GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                                                                           DatabaseDescriptor.getPartitionerName(),
                                                                           gDigests);
                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                                                                                          digestSynMessage,
                                                                                          GossipDigestSyn.serializer);
                    /* Gossip to some random live member */
                    boolean gossipedToSeed = doGossipToLiveMember(message);

                    /* Gossip to some unreachable member with some probability to check if he is back up */
                    maybeGossipToUnreachableMember(message);

                    /* Gossip to a seed if we did not do so above, or we have seen less nodes
                       than there are seeds.  This prevents partitions where each group of nodes
                       is only gossiping to a subset of the seeds.

                       The most straightforward check would be to check that all the seeds have been
                       verified either as live or unreachable.  To avoid that computation each round,
                       we reason that:

                       either all the live nodes are seeds, in which case non-seeds that come online
                       will introduce themselves to a member of the ring by definition,

                       or there is at least one non-seed node in the list, in which case eventually
                       someone will gossip to it, and then do a gossip to a random seed from the
                       gossipedToSeed check.

                       See CASSANDRA-150 for more exposition. */
                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
                        maybeGossipToSeed(message);

                    doStatusCheck();}
            }
            catch (Exception e)
            {JVMStabilityInspector.inspectThrowable(e);
                logger.error("Gossip error", e);
            }
            finally
            {taskLock.unlock();
            }
        }
    }

    private void doStatusCheck()
    {if (logger.isTraceEnabled())
            logger.trace("Performing status check ...");

        long now = System.currentTimeMillis();
        long nowNano = System.nanoTime();

        long pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).metrics.pendingTasks.getValue();
        if (pending > 0 && lastProcessedMessageAt < now - 1000)
        {
            // if some new messages just arrived, give the executor some time to work on them
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

            // still behind?  something's broke
            if (lastProcessedMessageAt < now - 1000)
            {logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending);
                return;
            }
        }

        Set<InetAddress> eps = endpointStateMap.keySet();
        for (InetAddress endpoint : eps)
        {if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                continue;

            FailureDetector.instance.interpret(endpoint);
            EndpointState epState = endpointStateMap.get(endpoint);
            if (epState != null)
            {
                // check if this is a fat client. fat clients are removed automatically from
                // gossip after FatClientTimeout.  Do not remove dead states here.
                if (isGossipOnlyMember(endpoint)
                    && !justRemovedEndpoints.containsKey(endpoint)
                    && TimeUnit.NANOSECONDS.toMillis(nowNano - epState.getUpdateTimestamp()) > fatClientTimeout)
                {logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fatClientTimeout);
                    removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
                    evictFromMembership(endpoint); // can get rid of the state immediately
                }

                // check for dead state removal
                long expireTime = getExpireTimeForEndpoint(endpoint);
                if (!epState.isAlive() && (now > expireTime)
                    && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
                {if (logger.isDebugEnabled())
                    {logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);
                    }
                    evictFromMembership(endpoint);
                }
            }
        }

        if (!justRemovedEndpoints.isEmpty())
        {for (Entry<InetAddress, Long> entry : justRemovedEndpoints.entrySet())
            {if ((now - entry.getValue()) > QUARANTINE_DELAY)
                {if (logger.isDebugEnabled())
                        logger.debug("{} elapsed, {} gossip quarantine over", QUARANTINE_DELAY, entry.getKey());
                    justRemovedEndpoints.remove(entry.getKey());
                }
            }
        }
    }

    //......
}
  • Gossiper 定义了 start 方法,该方法通过 executor.scheduleWithFixedDelay 创建了 GossipTask 的调度任务
  • GossipTask 的 run 方法会执行 doGossipToLiveMember、maybeGossipToUnreachableMember,最后执行 doStatusCheck 方法
  • doStatusCheck 方法会遍历 endpointStateMap 中的 InetAddress,对其执行 FailureDetector.instance.interpret(endpoint)

小结

  • IFailureDetector 接口定义了 isAlive、interpret、report、forceConviction、registerFailureDetectionEventListener、unregisterFailureDetectionEventListener 方法
  • FailureDetector 实现了 IFailureDetector, FailureDetectorMBean 接口;其 isAlive 方法取的 epState.isAlive() 的值,EndpointState 的 isAlive 返回的是 isAlive 值,则 markDead 方法则会标记该值为 false;其 interpret 方法调用 ArrivalWindow.phi 计算 now 值的 phi,然后乘以 PHI_FACTOR,如果大于 phiConvictThreshold 则会回调 IFailureDetectionEventListener 的 convict 方法
  • ArrivalWindow 使用 ArrayBackedBoundedStats 来存储 arrivalIntervals 值;其 add 方法是一个 synchronized 方法,它在 tLast 大于 0 且 interArrivalTime 小于等于 MAX_INTERVAL_IN_NANO 的时候才会执行 arrivalIntervals.add(interArrivalTime),如果 tLast 小于等于 0 则执行 arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS);phi 值采用了 exponential distribution appropriate,即通过 t / mean() 来近似计算 P(x <= t)
  • Gossiper 实现了 IFailureDetectionEventListener 接口,其 convict 方法会获取 endpointState,如果已经 shutdown 则执行 markAsShutdown 方法,否则执行 markDead 方法;markAsShutdown 方法会调用 markDead 方法,然后再调用 FailureDetector.instance.forceConviction(endpoint) 方法;markDead 方法则直接调用 endpointState.markDead() 方法,然后回调 IEndpointStateChangeSubscriber 的 onDead 方法
  • Gossiper 定义了 start 方法,该方法通过 executor.scheduleWithFixedDelay 创建了 GossipTask 的调度任务;GossipTask 的 run 方法会执行 doGossipToLiveMember、maybeGossipToUnreachableMember,最后执行 doStatusCheck 方法;doStatusCheck 方法会遍历 endpointStateMap 中的 InetAddress,对其执行 FailureDetector.instance.interpret(endpoint)

doc

  • The Phi Accrual Failure Detector by Hayashibara et al
  • Cassandra – A Decentralized Structured Storage System
  • inconsistent implementation of ‘cumulative distribution function’ for Exponential Distribution
  • Cassandra 中失效检测原理详解
  • cassandra 中对节点失败与否的探测方法,the Phi accrual Failure Dector,附论文
退出移动版