本文主要研究一下hazelcast的PhiAccrualFailureDetector

FailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/FailureDetector.java

/** * Failure detector tracks heartbeats of a member and decides liveness/availability of the member. */public interface FailureDetector {    /**     * Notifies this failure detector about received heartbeat message from the tracked member.     *     * @param timestamp timestamp of heartbeat message in milliseconds     */    void heartbeat(long timestamp);    /**     * Returns true if the tracked member is considered as alive/available.     * @param timestamp timestamp in milliseconds     * @return true if the member is alive     */    boolean isAlive(long timestamp);    /**     * Returns the last heartbeat timestamp for the tracked member.     * @return heartbeat timestamp in milliseconds     */    long lastHeartbeat();    /**     * Returns suspicion level about the tracked member. Returned value is mostly implementation dependent.     * <code>0</code> indicates no suspicion at all.     * @param timestamp timestamp in milliseconds     * @return suspicion level     */    double suspicionLevel(long timestamp);}
  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法

PhiAccrualFailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/PhiAccrualFailureDetector.java

/** * Port of Akka's PhiAccrualFailureDetector.scala * <p> * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper. * <p> * The suspicion level of failure is given by a value called  (phi). * The basic idea of the  failure detector is to express the value of  on a scale that * is dynamically adjusted to reflect current network conditions. A configurable * threshold is used to decide if <code></code> is considered to be a failure. * <p> * The value of <code></code> is calculated as: * <p> * <code> * <pre> *  = -log10(1 - F(timeSinceLastHeartbeat) * </pre> * </code> * where F is the cumulative distribution function of a normal distribution with mean * and standard deviation estimated from historical heartbeat inter-arrival times. */public class PhiAccrualFailureDetector implements FailureDetector {    static final long NO_HEARTBEAT_TIMESTAMP = -1;    private final double threshold;    private final double minStdDeviationMillis;    private final long acceptableHeartbeatPauseMillis;    private final HeartbeatHistory heartbeatHistory;    private volatile long lastHeartbeatMillis = NO_HEARTBEAT_TIMESTAMP;    /**     * @param threshold                      A low threshold is prone to generate many wrong suspicions but ensures     *                                       a quick detection in the event of a real crash. Conversely, a high threshold     *                                       generates fewer mistakes but needs more time to detect actual crashes     * @param maxSampleSize                  Number of samples to use for calculation of mean and standard deviation of     *                                       inter-arrival times.     * @param minStdDeviationMillis          Minimum standard deviation to use for the normal distribution used when     *                                       calculating phi. Too low standard deviation might result in too much sensitivity     *                                       for sudden, but normal, deviations in heartbeat inter arrival times.     * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially lost/delayed     *                                       heartbeats that will be accepted before considering it to be an anomaly.     *                                       This margin is important to be able to survive sudden, occasional, pauses     *                                       in heartbeat arrivals, due to for example garbage collect or network drop.     * @param firstHeartbeatEstimateMillis   Bootstrap the stats with heartbeats that corresponds to this duration,     *                                       with a with rather high standard deviation (since environment is unknown     *                                       in the beginning)     */    public PhiAccrualFailureDetector(double threshold, int maxSampleSize, double minStdDeviationMillis,            long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {        this.threshold = checkPositive(threshold, "Threshold must be positive: " + threshold);        this.minStdDeviationMillis = checkPositive(minStdDeviationMillis, "Minimum standard deviation must be positive: "                + minStdDeviationMillis);        this.acceptableHeartbeatPauseMillis = checkNotNegative(acceptableHeartbeatPauseMillis,                "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);        checkPositive(firstHeartbeatEstimateMillis, "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);        heartbeatHistory = new HeartbeatHistory(maxSampleSize);        firstHeartbeat(firstHeartbeatEstimateMillis);    }    // guess statistics for first heartbeat,    // important so that connections with only one heartbeat becomes unavailable    // bootstrap with 2 entries with rather high standard deviation    @SuppressWarnings("checkstyle:magicnumber")    private void firstHeartbeat(long firstHeartbeatEstimateMillis) {        long stdDeviationMillis = firstHeartbeatEstimateMillis / 4;        heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis);        heartbeatHistory.add(firstHeartbeatEstimateMillis + stdDeviationMillis);    }    private double ensureValidStdDeviation(double stdDeviationMillis) {        return Math.max(stdDeviationMillis, minStdDeviationMillis);    }    /**     * The suspicion level of the accrual failure detector.     *     * If a connection does not have any records in failure detector then it is     * considered healthy.     */    private double phi(long timestampMillis) {        long timeDiffMillis;        double meanMillis;        double stdDeviationMillis;        synchronized (heartbeatHistory) {            long lastTimestampMillis = lastHeartbeatMillis;            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {                return 0.0;            }            timeDiffMillis = timestampMillis - lastTimestampMillis;            meanMillis = heartbeatHistory.mean();            stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());        }        return phi(timeDiffMillis, meanMillis + acceptableHeartbeatPauseMillis, stdDeviationMillis);    }    /**     * Calculation of phi, derived from the Cumulative distribution function for     * N(mean, stdDeviation) normal distribution, given by     * 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))     * where y = (x - mean) / standard_deviation     * This is an approximation defined in  Mathematics Handbook (Logistic approximation).     * Error is 0.00014 at +- 3.16     * The calculated value is equivalent to -log10(1 - CDF(y))     */    @SuppressWarnings("checkstyle:magicnumber")    private static double phi(long timeDiffMillis, double meanMillis, double stdDeviationMillis) {        double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;        double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));        if (timeDiffMillis > meanMillis) {            return -Math.log10(e / (1.0 + e));        } else {            return -Math.log10(1.0 - 1.0 / (1.0 + e));        }    }    @Override    public boolean isAlive(long timestampMillis) {        double phi = phi(timestampMillis);        return phi < threshold;    }    @Override    public void heartbeat(long timestampMillis) {        synchronized (heartbeatHistory) {            long lastTimestampMillis = getAndSetLastHeartbeat(timestampMillis);            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {                return;            }            if (isAlive(timestampMillis)) {                heartbeatHistory.add(timestampMillis - lastTimestampMillis);            }        }    }    private long getAndSetLastHeartbeat(long timestampMillis) {        long lastTimestampMillis = lastHeartbeatMillis;        lastHeartbeatMillis = timestampMillis;        return lastTimestampMillis;    }    @Override    public long lastHeartbeat() {        return lastHeartbeatMillis;    }    @Override    public double suspicionLevel(long timestamp) {        return phi(timestamp);    }    /**     * Holds the heartbeat statistics for a specific member.     * It is capped by the number of samples specified in `maxSampleSize`.     *     * The stats (mean, variance, stdDeviation) are not defined for     * for empty HeartbeatHistory, i.e. throws ArithmeticException.     */    private static class HeartbeatHistory {        private final int maxSampleSize;        private final LinkedList<Long> intervals = new LinkedList<Long>();        private long intervalSum;        private long squaredIntervalSum;        HeartbeatHistory(int maxSampleSize) {            if (maxSampleSize < 1) {                throw new IllegalArgumentException("Sample size must be >= 1 : " + maxSampleSize);            }            this.maxSampleSize = maxSampleSize;        }        double mean() {            return (double) intervalSum / intervals.size();        }        double variance() {            double mean = mean();            return ((double) squaredIntervalSum / intervals.size()) - (mean * mean);        }        double stdDeviation() {            return Math.sqrt(variance());        }        void add(long interval) {            if (intervals.size() >= maxSampleSize) {                dropOldest();            }            intervals.add(interval);            intervalSum += interval;            squaredIntervalSum += pow2(interval);        }        private void dropOldest() {            long dropped = intervals.pollFirst();            intervalSum -= dropped;            squaredIntervalSum -= pow2(dropped);        }        private static long pow2(long x) {            return x * x;        }    }}
  • PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的PhiAccrualFailureDetector.scala的java版本
  • (phi)为指定值被认定为failure的suspicion level,其计算公式为 = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • phi方法使用了 Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y (1.5976 + 0.070566 y * y))),其中y = (x - mean) / standard_deviation
CDF(y)在x > mean的时候(e<1)将公式转换为 = -Math.log10(e / (1.0 + e));在x <= mean的时候(e>=1)的采用公式-Math.log10(1.0 - 1.0 / (1.0 + e)),目前还不清楚为何这样区分计算
  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory
  • 这里的实现增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis

小结

  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法;PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的PhiAccrualFailureDetector.scala的java版本
  • (phi)为指定值被认定为failure的suspicion level,其计算公式为 = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • akka的实现中,其phi方法使用了 Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y (1.5976 + 0.070566 y * y))),其中y = (x - mean) / standard_deviation;另外也增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis
  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory

doc

  • PhiAccrualFailureDetector.scala
  • PhiAccrualFailureDetector.java
  • A logistic approximation to the cumulative normal distribution - Core