博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊hazelcast的PhiAccrualFailureDetector
阅读量:6998 次
发布时间:2019-06-27

本文共 11727 字,大约阅读时间需要 39 分钟。

本文主要研究一下hazelcast的PhiAccrualFailureDetector

FailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/FailureDetector.java

/** * Failure detector tracks heartbeats of a member and decides liveness/availability of the member. */public interface FailureDetector {    /**     * Notifies this failure detector about received heartbeat message from the tracked member.     *     * @param timestamp timestamp of heartbeat message in milliseconds     */    void heartbeat(long timestamp);    /**     * Returns true if the tracked member is considered as alive/available.     * @param timestamp timestamp in milliseconds     * @return true if the member is alive     */    boolean isAlive(long timestamp);    /**     * Returns the last heartbeat timestamp for the tracked member.     * @return heartbeat timestamp in milliseconds     */    long lastHeartbeat();    /**     * Returns suspicion level about the tracked member. Returned value is mostly implementation dependent.     * 0 indicates no suspicion at all.     * @param timestamp timestamp in milliseconds     * @return suspicion level     */    double suspicionLevel(long timestamp);}复制代码
  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法

PhiAccrualFailureDetector

hazelcast-3.12-sources.jar!/com/hazelcast/internal/cluster/fd/PhiAccrualFailureDetector.java

/** * Port of Akka's PhiAccrualFailureDetector.scala * 

* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper. *

* The suspicion level of failure is given by a value called φ (phi). * The basic idea of the φ failure detector is to express the value of φ on a scale that * is dynamically adjusted to reflect current network conditions. A configurable * threshold is used to decide if φ is considered to be a failure. *

* The value of φ is calculated as: *

* *

 * φ = -log10(1 - F(timeSinceLastHeartbeat) * 
* * where F is the cumulative distribution function of a normal distribution with mean * and standard deviation estimated from historical heartbeat inter-arrival times. */public class PhiAccrualFailureDetector implements FailureDetector { static final long NO_HEARTBEAT_TIMESTAMP = -1; private final double threshold; private final double minStdDeviationMillis; private final long acceptableHeartbeatPauseMillis; private final HeartbeatHistory heartbeatHistory; private volatile long lastHeartbeatMillis = NO_HEARTBEAT_TIMESTAMP; /** * @param threshold A low threshold is prone to generate many wrong suspicions but ensures * a quick detection in the event of a real crash. Conversely, a high threshold * generates fewer mistakes but needs more time to detect actual crashes * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of * inter-arrival times. * @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used when * calculating phi. Too low standard deviation might result in too much sensitivity * for sudden, but normal, deviations in heartbeat inter arrival times. * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially lost/delayed * heartbeats that will be accepted before considering it to be an anomaly. * This margin is important to be able to survive sudden, occasional, pauses * in heartbeat arrivals, due to for example garbage collect or network drop. * @param firstHeartbeatEstimateMillis Bootstrap the stats with heartbeats that corresponds to this duration, * with a with rather high standard deviation (since environment is unknown * in the beginning) */ public PhiAccrualFailureDetector(double threshold, int maxSampleSize, double minStdDeviationMillis, long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) { this.threshold = checkPositive(threshold, "Threshold must be positive: " + threshold); this.minStdDeviationMillis = checkPositive(minStdDeviationMillis, "Minimum standard deviation must be positive: " + minStdDeviationMillis); this.acceptableHeartbeatPauseMillis = checkNotNegative(acceptableHeartbeatPauseMillis, "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis); checkPositive(firstHeartbeatEstimateMillis, "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis); heartbeatHistory = new HeartbeatHistory(maxSampleSize); firstHeartbeat(firstHeartbeatEstimateMillis); } // guess statistics for first heartbeat, // important so that connections with only one heartbeat becomes unavailable // bootstrap with 2 entries with rather high standard deviation @SuppressWarnings("checkstyle:magicnumber") private void firstHeartbeat(long firstHeartbeatEstimateMillis) { long stdDeviationMillis = firstHeartbeatEstimateMillis / 4; heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis); heartbeatHistory.add(firstHeartbeatEstimateMillis + stdDeviationMillis); } private double ensureValidStdDeviation(double stdDeviationMillis) { return Math.max(stdDeviationMillis, minStdDeviationMillis); } /** * The suspicion level of the accrual failure detector. * * If a connection does not have any records in failure detector then it is * considered healthy. */ private double phi(long timestampMillis) { long timeDiffMillis; double meanMillis; double stdDeviationMillis; synchronized (heartbeatHistory) { long lastTimestampMillis = lastHeartbeatMillis; if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) { return 0.0; } timeDiffMillis = timestampMillis - lastTimestampMillis; meanMillis = heartbeatHistory.mean(); stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation()); } return phi(timeDiffMillis, meanMillis + acceptableHeartbeatPauseMillis, stdDeviationMillis); } /** * Calculation of phi, derived from the Cumulative distribution function for * N(mean, stdDeviation) normal distribution, given by * 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))) * where y = (x - mean) / standard_deviation * This is an approximation defined in β Mathematics Handbook (Logistic approximation). * Error is 0.00014 at +- 3.16 * The calculated value is equivalent to -log10(1 - CDF(y)) */ @SuppressWarnings("checkstyle:magicnumber") private static double phi(long timeDiffMillis, double meanMillis, double stdDeviationMillis) { double y = (timeDiffMillis - meanMillis) / stdDeviationMillis; double e = Math.exp(-y * (1.5976 + 0.070566 * y * y)); if (timeDiffMillis > meanMillis) { return -Math.log10(e / (1.0 + e)); } else { return -Math.log10(1.0 - 1.0 / (1.0 + e)); } } @Override public boolean isAlive(long timestampMillis) { double phi = phi(timestampMillis); return phi < threshold; } @Override public void heartbeat(long timestampMillis) { synchronized (heartbeatHistory) { long lastTimestampMillis = getAndSetLastHeartbeat(timestampMillis); if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) { return; } if (isAlive(timestampMillis)) { heartbeatHistory.add(timestampMillis - lastTimestampMillis); } } } private long getAndSetLastHeartbeat(long timestampMillis) { long lastTimestampMillis = lastHeartbeatMillis; lastHeartbeatMillis = timestampMillis; return lastTimestampMillis; } @Override public long lastHeartbeat() { return lastHeartbeatMillis; } @Override public double suspicionLevel(long timestamp) { return phi(timestamp); } /** * Holds the heartbeat statistics for a specific member. * It is capped by the number of samples specified in `maxSampleSize`. * * The stats (mean, variance, stdDeviation) are not defined for * for empty HeartbeatHistory, i.e. throws ArithmeticException. */ private static class HeartbeatHistory { private final int maxSampleSize; private final LinkedList
intervals = new LinkedList
(); private long intervalSum; private long squaredIntervalSum; HeartbeatHistory(int maxSampleSize) { if (maxSampleSize < 1) { throw new IllegalArgumentException("Sample size must be >= 1 : " + maxSampleSize); } this.maxSampleSize = maxSampleSize; } double mean() { return (double) intervalSum / intervals.size(); } double variance() { double mean = mean(); return ((double) squaredIntervalSum / intervals.size()) - (mean * mean); } double stdDeviation() { return Math.sqrt(variance()); } void add(long interval) { if (intervals.size() >= maxSampleSize) { dropOldest(); } intervals.add(interval); intervalSum += interval; squaredIntervalSum += pow2(interval); } private void dropOldest() { long dropped = intervals.pollFirst(); intervalSum -= dropped; squaredIntervalSum -= pow2(dropped); } private static long pow2(long x) { return x * x; } }}复制代码
  • PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的的java版本
  • φ (phi)为指定值被认定为failure的suspicion level,其计算公式为φ = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • phi方法使用了β Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))),其中y = (x - mean) / standard_deviation

CDF(y)在x > mean的时候(e<1)将公式转换为φ = -Math.log10(e / (1.0 + e));在x <= mean的时候(e>=1)的采用公式-Math.log10(1.0 - 1.0 / (1.0 + e)),目前还不清楚为何这样区分计算

  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory
  • 这里的实现增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis

小结

  • FailureDetector接口定义了heartbeat、isAlive、lastHeartbeat、suspicionLevel方法;PhiAccrualFailureDetector实现了FailureDetector接口,其实现是akka的的java版本
  • φ (phi)为指定值被认定为failure的suspicion level,其计算公式为φ = -log10(1 - CDF(timeSinceLastHeartbeat),其中CDF函数为normal distribution的cumulative distribution function,即正态分布的累积分布函数
  • akka的实现中,其phi方法使用了β Mathematics Handbook中定义的Logistic approximation公式来近似计算CDF(y)(Error is 0.00014 at +- 3.16),即CDF(y)=1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y))),其中y = (x - mean) / standard_deviation;另外也增加了acceptableHeartbeatPauseMillis参数,即在最后计算phi值时传入的meanMillis为meanMillis + acceptableHeartbeatPauseMillis
  • isAlive方法会计算该timestampMillis的phi值,然后与threshold值(hazelcast中默认为10)判断,小于threshold值才判断为live
  • heartbeat方法会先判断该timestampMillis是否live,是的话,再将timestampMillis - lastTimestampMillis值添加到heartbeatHistory

doc

转载于:https://juejin.im/post/5cc864b2e51d456e8c1d3cf6

你可能感兴趣的文章
【转载】学习新东西的唯一方法
查看>>
[转]Android dex分包方案
查看>>
关于Redis的启动过程
查看>>
Android 按二次后退键退出应用程序
查看>>
Springboot监控之一:SpringBoot四大神器之Actuator之2--springboot健康检查
查看>>
[唐诗]秋夜喜遇王处士-王绩
查看>>
一个简单多任务内核实例的分析【转】
查看>>
WPF 3D 小小小小引擎 - ·WPF 3D变换应用
查看>>
又一道简单题&&Ladygod(两道思维水题)
查看>>
golang笔记——函数与方法
查看>>
Linux LVM硬盘管理及LVM扩容
查看>>
针对某个数据库error做systemstate dump
查看>>
iOS开发--SWRevealViewController
查看>>
JSP--百度百科
查看>>
TCP/IP详解学习笔记(2)-数据链路层
查看>>
VMware+Windgb+Win7内核驱动调试
查看>>
initWithFrame、initWithCoder、awakeFromNib的区别和调用次序 & UIViewController生命周期 查缺补漏...
查看>>
客户端请求新页面
查看>>
VMware安装CentOS时,无法以图形界面安装解决办法
查看>>
SpringMvc文件资源防止被外链链接
查看>>