关于eureka:限流实现Eurekaclient-的RateLimiter

37次阅读

共计 2642 个字符,预计需要花费 7 分钟才能阅读完成。

/*

  • Copyright 2014 Netflix, Inc.
    *
  • Licensed under the Apache License, Version 2.0 (the “License”);
  • you may not use this file except in compliance with the License.
  • You may obtain a copy of the License at
    *
  • http://www.apache.org/license…
    *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an “AS IS” BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.
    */

package com.netflix.discovery.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**

  • Rate limiter implementation is based on token bucket algorithm. There are two parameters:
  • <ul>
  • <li>
  • burst size – maximum number of requests allowed into the system as a burst
  • </li>
  • <li>
  • average rate – expected number of requests per second (RateLimiters using MINUTES is also supported)
  • </li>
  • </ul>
    *
  • @author Tomasz Bak
    */

public class RateLimiter {

private final long rateToMsConversion;

private final AtomicInteger consumedTokens = new AtomicInteger();
private final AtomicLong lastRefillTime = new AtomicLong(0);

@Deprecated
public RateLimiter() {this(TimeUnit.SECONDS);
}

public RateLimiter(TimeUnit averageRateUnit) {switch (averageRateUnit) {
        case SECONDS:
            rateToMsConversion = 1000;
            break;
        case MINUTES:
            rateToMsConversion = 60 * 1000;
            break;
        default:
            throw new IllegalArgumentException("TimeUnit of" + averageRateUnit + "is not supported");
    }
}

public boolean acquire(int burstSize, long averageRate) {return acquire(burstSize, averageRate, System.currentTimeMillis());
}

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
        return true;
    }

    refillToken(burstSize, averageRate, currentTimeMillis);
    return consumeToken(burstSize);
}

private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {long refillTime = lastRefillTime.get();
    long timeDelta = currentTimeMillis - refillTime;

    long newTokens = timeDelta * averageRate / rateToMsConversion;
    if (newTokens > 0) {
        long newRefillTime = refillTime == 0
                ? currentTimeMillis
                : refillTime + newTokens * rateToMsConversion / averageRate;
        if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {while (true) {int currentLevel = consumedTokens.get();
                int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
                int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
                if (consumedTokens.compareAndSet(currentLevel, newLevel)) {return;}
            }
        }
    }
}

private boolean consumeToken(int burstSize) {while (true) {int currentLevel = consumedTokens.get();
        if (currentLevel >= burstSize) {return false;}
        if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {return true;}
    }
}

public void reset() {consumedTokens.set(0);
    lastRefillTime.set(0);
}

}

正文完
 0