/*
*
* Copyright 2016 Robert Winkler and Bohdan Storozhuk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.ratelimiter.internal;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.event.RateLimiterOnDrainedEvent;
import io.github.resilience4j.ratelimiter.event.RateLimiterOnFailureEvent;
import io.github.resilience4j.ratelimiter.event.RateLimiterOnSuccessEvent;
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import static java.lang.Long.min;
import static java.lang.System.nanoTime;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.locks.LockSupport.parkNanos;
/**
* {@link AtomicRateLimiter} splits all nanoseconds from the start of epoch into cycles.
* <p>Each cycle has duration of {@link RateLimiterConfig#getLimitRefreshPeriod} in nanoseconds.
* <p>By contract on start of each cycle {@link AtomicRateLimiter} should
* set {@link State#activePermissions} to {@link RateLimiterConfig#getLimitForPeriod}. For the {@link
* AtomicRateLimiter} callers it is really looks so, but under the hood there is some optimisations
* that will skip this refresh if {@link AtomicRateLimiter} is not used actively.
* <p>All {@link AtomicRateLimiter} updates are atomic and state is encapsulated in {@link
* AtomicReference} to {@link AtomicRateLimiter.State}
*/
public class AtomicRateLimiter implements RateLimiter {
private final long nanoTimeStart;
private final String name;
private final AtomicInteger waitingThreads;
private final AtomicReference<State> state;
private final Map<String, String> tags;
private final RateLimiterEventProcessor eventProcessor;
public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {this(name, rateLimiterConfig, HashMap.empty());
}
public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig,
Map<String, String> tags) {
this.name = name;
this.tags = tags;
this.nanoTimeStart = nanoTime();
waitingThreads = new AtomicInteger(0);
state = new AtomicReference<>(new State(rateLimiterConfig, 0, rateLimiterConfig.getLimitForPeriod(), 0
));
eventProcessor = new RateLimiterEventProcessor();}
/**
* {@inheritDoc}
*/
@Override
public void changeTimeoutDuration(final Duration timeoutDuration) {RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config)
.timeoutDuration(timeoutDuration)
.build();
state.updateAndGet(currentState -> new State(
newConfig, currentState.activeCycle, currentState.activePermissions,
currentState.nanosToWait
));
}
/**
* {@inheritDoc}
*/
@Override
public void changeLimitForPeriod(final int limitForPeriod) {RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config)
.limitForPeriod(limitForPeriod)
.build();
state.updateAndGet(currentState -> new State(
newConfig, currentState.activeCycle, currentState.activePermissions,
currentState.nanosToWait
));
}
/**
* Calculates time elapsed from the class loading.
*/
private long currentNanoTime() {return nanoTime() - nanoTimeStart;
}
long getNanoTimeStart() {return this.nanoTimeStart;}
/**
* {@inheritDoc}
*/
@Override
public boolean acquirePermission(final int permits) {long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
State modifiedState = updateStateWithBackOff(permits, timeoutInNanos);
boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
publishRateLimiterAcquisitionEvent(result, permits);
return result;
}
/**
* {@inheritDoc}
*/
@Override
public long reservePermission(final int permits) {long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
State modifiedState = updateStateWithBackOff(permits, timeoutInNanos);
boolean canAcquireImmediately = modifiedState.nanosToWait <= 0;
if (canAcquireImmediately) {publishRateLimiterAcquisitionEvent(true, permits);
return 0;
}
boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
if (canAcquireInTime) {publishRateLimiterAcquisitionEvent(true, permits);
return modifiedState.nanosToWait;
}
publishRateLimiterAcquisitionEvent(false, permits);
return -1;
}
@Override
public void drainPermissions() {
AtomicRateLimiter.State prev;
AtomicRateLimiter.State next;
do {prev = state.get();
next = calculateNextState(prev.activePermissions, 0, prev);
} while (!compareAndSet(prev, next));
if (eventProcessor.hasConsumers()) {eventProcessor.consumeEvent(new RateLimiterOnDrainedEvent(name, Math.min(prev.activePermissions, 0)));
}
}
/**
* Atomically updates the current {@link State} with the results of applying the {@link
* AtomicRateLimiter#calculateNextState}, returning the updated {@link State}. It differs from
* {@link AtomicReference#updateAndGet(UnaryOperator)} by constant back off. It means that after
* one try to {@link AtomicReference#compareAndSet(Object, Object)} this method will wait for a
* while before try one more time. This technique was originally described in this
* <a href="https://arxiv.org/abs/1305.5800"> paper</a>
* and showed great results with {@link AtomicRateLimiter} in benchmark tests.
*
* @param timeoutInNanos a side-effect-free function
* @return the updated value
*/
private State updateStateWithBackOff(final int permits, final long timeoutInNanos) {
AtomicRateLimiter.State prev;
AtomicRateLimiter.State next;
do {prev = state.get();
next = calculateNextState(permits, timeoutInNanos, prev);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Atomically sets the value to the given updated value if the current value {@code ==} the
* expected value. It differs from {@link AtomicReference#updateAndGet(UnaryOperator)} by
* constant back off. It means that after one try to {@link AtomicReference#compareAndSet(Object,
* Object)} this method will wait for a while before try one more time. This technique was
* originally described in this
* <a href="https://arxiv.org/abs/1305.5800"> paper</a>
* and showed great results with {@link AtomicRateLimiter} in benchmark tests.
*
* @param current the expected value
* @param next the new value
* @return {@code true} if successful. False return indicates that the actual value was not
* equal to the expected value.
*/
private boolean compareAndSet(final State current, final State next) {if (state.compareAndSet(current, next)) {return true;}
parkNanos(1); // back-off
return false;
}
/**
* A side-effect-free function that can calculate next {@link State} from current. It determines
* time duration that you should wait for the given number of permits and reserves it for you,
* if you'll be able to wait long enough.
*
* @param permits number of permits
* @param timeoutInNanos max time that caller can wait for permission in nanoseconds
* @param activeState current state of {@link AtomicRateLimiter}
* @return next {@link State}
*/
private State calculateNextState(final int permits, final long timeoutInNanos,
final State activeState) {long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
int permissionsPerCycle = activeState.config.getLimitForPeriod();
long currentNanos = currentNanoTime();
long currentCycle = currentNanos / cyclePeriodInNanos;
long nextCycle = activeState.activeCycle;
int nextPermissions = activeState.activePermissions;
if (nextCycle != currentCycle) {
long elapsedCycles = currentCycle - nextCycle;
long accumulatedPermissions = elapsedCycles * permissionsPerCycle;
nextCycle = currentCycle;
nextPermissions = (int) min(nextPermissions + accumulatedPermissions,
permissionsPerCycle);
}
long nextNanosToWait = nanosToWaitForPermission(
permits, cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos,
currentCycle
);
State nextState = reservePermissions(activeState.config, permits, timeoutInNanos, nextCycle,
nextPermissions, nextNanosToWait);
return nextState;
}
/**
* Calculates time to wait for the required permits of permissions to get accumulated
*
* @param permits permits of required permissions
* @param cyclePeriodInNanos current configuration values
* @param permissionsPerCycle current configuration values
* @param availablePermissions currently available permissions, can be negative if some
* permissions have been reserved
* @param currentNanos current time in nanoseconds
* @param currentCycle current {@link AtomicRateLimiter} cycle @return nanoseconds to
* wait for the next permission
*/
private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,
final int permissionsPerCycle,
final int availablePermissions, final long currentNanos, final long currentCycle) {if (availablePermissions >= permits) {return 0L;}
long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;
long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;
int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;
int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits), permissionsPerCycle);
return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
}
/**
* Divide two integers and round result to the bigger near mathematical integer.
*
* @param x - should be > 0
* @param y - should be > 0
*/
private static int divCeil(int x, int y) {return (x + y - 1) / y;
}
/**
* Determines whether caller can acquire permission before timeout or not and then creates
* corresponding {@link State}. Reserves permissions only if caller can successfully wait for
* permission.
*
* @param config
* @param permits permits of permissions
* @param timeoutInNanos max time that caller can wait for permission in nanoseconds
* @param cycle cycle for new {@link State}
* @param permissions permissions for new {@link State}
* @param nanosToWait nanoseconds to wait for the next permission
* @return new {@link State} with possibly reserved permissions and time to wait
*/
private State reservePermissions(final RateLimiterConfig config, final int permits,
final long timeoutInNanos,
final long cycle, final int permissions, final long nanosToWait) {
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
int permissionsWithReservation = permissions;
if (canAcquireInTime) {permissionsWithReservation -= permits;}
return new State(config, cycle, permissionsWithReservation, nanosToWait);
}
/**
* If nanosToWait is bigger than 0 it tries to park {@link Thread} for nanosToWait but not
* longer then timeoutInNanos.
*
* @param timeoutInNanos max time that caller can wait
* @param nanosToWait nanoseconds caller need to wait
* @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and
* not exceed timeout
*/
private boolean waitForPermissionIfNecessary(final long timeoutInNanos,
final long nanosToWait) {
boolean canAcquireImmediately = nanosToWait <= 0;
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
if (canAcquireImmediately) {return true;}
if (canAcquireInTime) {return waitForPermission(nanosToWait);
}
waitForPermission(timeoutInNanos);
return false;
}
/**
* Parks {@link Thread} for nanosToWait.
* <p>If the current thread is {@linkplain Thread#interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException}, but its
* interrupt status will be set.
*
* @param nanosToWait nanoseconds caller need to wait
* @return true if caller was not {@link Thread#interrupted} while waiting
*/
private boolean waitForPermission(final long nanosToWait) {waitingThreads.incrementAndGet();
long deadline = currentNanoTime() + nanosToWait;
boolean wasInterrupted = false;
while (currentNanoTime() < deadline && !wasInterrupted) {long sleepBlockDuration = deadline - currentNanoTime();
parkNanos(sleepBlockDuration);
wasInterrupted = Thread.interrupted();}
waitingThreads.decrementAndGet();
if (wasInterrupted) {currentThread().interrupt();}
return !wasInterrupted;
}
/**
* {@inheritDoc}
*/
@Override
public String getName() {return name;}
/**
* {@inheritDoc}
*/
@Override
public RateLimiterConfig getRateLimiterConfig() {return state.get().config;
}
/**
* {@inheritDoc}
*/
@Override
public Map<String, String> getTags() {return tags;}
/**
* {@inheritDoc}
*/
@Override
public Metrics getMetrics() {return new AtomicRateLimiterMetrics();
}
@Override
public EventPublisher getEventPublisher() {return eventProcessor;}
@Override
public String toString() {
return "AtomicRateLimiter{" +
"name='" + name + '\'' +
", rateLimiterConfig=" + state.get().config +
'}';
}
/**
* Get the enhanced Metrics with some implementation specific details.
*
* @return the detailed metrics
*/
public AtomicRateLimiterMetrics getDetailedMetrics() {return new AtomicRateLimiterMetrics();
}
private void publishRateLimiterAcquisitionEvent(boolean permissionAcquired, int permits) {if (!eventProcessor.hasConsumers()) {return;}
if (permissionAcquired) {eventProcessor.consumeEvent(new RateLimiterOnSuccessEvent(name, permits));
return;
}
eventProcessor.consumeEvent(new RateLimiterOnFailureEvent(name, permits));
}
/**
* <p>{@link AtomicRateLimiter.State} represents immutable state of {@link AtomicRateLimiter}
* where:
* <ul>
* <li>activeCycle - {@link AtomicRateLimiter} cycle number that was used
* by the last {@link AtomicRateLimiter#acquirePermission()} call.</li>
* <p>
* <li>activePermissions - count of available permissions after
* the last {@link AtomicRateLimiter#acquirePermission()} call.
* Can be negative if some permissions where reserved.</li>
* <p>
* <li>nanosToWait - count of nanoseconds to wait for permission for
* the last {@link AtomicRateLimiter#acquirePermission()} call.</li>
* </ul>
*/
private static class State {
private final RateLimiterConfig config;
private final long activeCycle;
private final int activePermissions;
private final long nanosToWait;
private State(RateLimiterConfig config,
final long activeCycle, final int activePermissions, final long nanosToWait) {
this.config = config;
this.activeCycle = activeCycle;
this.activePermissions = activePermissions;
this.nanosToWait = nanosToWait;
}
}
/**
* Enhanced {@link Metrics} with some implementation specific details
*/
public class AtomicRateLimiterMetrics implements Metrics {private AtomicRateLimiterMetrics() { }
/**
* {@inheritDoc}
*/
@Override
public int getNumberOfWaitingThreads() {return waitingThreads.get();
}
/**
* {@inheritDoc}
*/
@Override
public int getAvailablePermissions() {State currentState = state.get();
State estimatedState = calculateNextState(1, -1, currentState);
return estimatedState.activePermissions;
}
/**
* @return estimated time duration in nanos to wait for the next permission
*/
public long getNanosToWait() {State currentState = state.get();
State estimatedState = calculateNextState(1, -1, currentState);
return estimatedState.nanosToWait;
}
/**
* @return estimated current cycle
*/
public long getCycle() {State currentState = state.get();
State estimatedState = calculateNextState(1, -1, currentState);
return estimatedState.activeCycle;
}
}
}