关于node.js:Java构建简单的速率限制器

45次阅读

共计 5284 个字符,预计需要花费 14 分钟才能阅读完成。

速率限度
事实世界中的用户是残忍的,并且没急躁,充斥着各种不确定性。在高并发零碎中,可能会呈现服务器被虚伪申请轰炸的状况,因而您可能心愿管制这种状况。
一些理论应用情景可能如下所示:

API 配额治理 - 作为提供者,您可能心愿依据用户的付款状况限度向服务器收回 API 申请的速率。这能够在客户端或服务端实现。
安全性 - 避免 DDOS 攻打。
老本管制 – 这对服务方甚至客户方来说都不是必须的。如果某个组件以十分高的速率收回一个事件,它可能有助于管制它,它可能有助于管制从客户端发送的遥测。

限速解决时的选项
依据咱们解决的申请 / 事件类型,可能会产生以下状况:

咱们能够放弃额定的申请
咱们能够抉择让申请期待,直到零碎将它们升高到预约义的速率。

罕用限速算法

令牌桶算法
漏桶算法

咱们将不深刻探讨这些算法的外部细节,因为这超出了本文的范畴。
咱们将以令牌桶算法为核心。其要求如下。
令牌桶算法基于以固定速率增加令牌的固定容量桶的类比。在容许 API 持续之前,将查看桶,以查看它过后是否蕴含至多一个令牌。如果令牌存在,则进行 API 调用。如果不是,则抛弃该音讯 / 或使其期待。
需要

应该可能承受每秒所需的(TPS)事务或速率。
如果超过咱们定义的比率,则应放弃交易。
应该在同时产生的状况下起作用。

高级性能(在后续文章中实现)

应该可能平滑突发的申请。例如,如果咱们将 TPS 定义为 5,并且所有五个申请都在同一时刻达到,那么它应该可能以固定的工夫距离将它们排成一行,即以 200ms 的工夫距离执行每个申请。它须要一个外部定时电路。
如果咱们的 TPS 为 5,并且在其中一个 1 秒的时段中,咱们在下一秒只应用 3 个代币,那么咱们应该可能提供 5 +2 = 7 个代币作为处分。但速率为每个令牌 1 /7(142.28ms)。奖金不应结转到下一个插槽。

让咱们首先定义咱们的 速率限制器:
/**

  • Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of
  • TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call,
  • or a simple function call.
  • <p>
  • Every {@link RateLimiter} implementation should implement either {@link RateLimiter#throttle(Code)} or, {@link RateLimiter#enter()}.
  • They can also choose to implement all.
  • <p>
  • {@link Code} represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited
  • spreads across multiple functions, we need to use entry() and exit() contract.
    */

public interface RateLimiter {

/**

 * Rate limits the code passed inside as an argument.
 *
 * @param code representation of the piece of code that needs to be rate limited.
 * @return true if executed, false otherwise.
 */
boolean throttle(Code code);
/**
 * When the piece of code that needs to be rate limited cannot be represented as a contiguous
 * code, then entry() should be used before we start executing the code. This brings the code inside the rate
 * limiting boundaries.
 *
 * @return true if the code will execute and false if rate limited.
 * <p
 */
boolean enter();
/**
 * Interface to represent a contiguous piece of code that needs to be rate limited.
 */
interface Code {
    /**
     * Calling this function should execute the code that is delegated to this interface.
     */
    void invoke();}

}
复制代码
咱们的 RateLimit 有两组 API:一个是 throttle(code),另一个是 enter()。这两种办法都满足雷同的性能,但采纳以下两种形式:

boolean throttle(代码)- 如果咱们有间断的代码,能够用来传递一个代码块。
布尔输出()– 通常能够在 API、DB 或任何咱们想要节流的调用之前应用。如果执行此代码前面的代码,则将返回 真,以及 假的如果它是速率受限的话。您能够将这些申请排队或回绝。

在生产环境中您永远不会看到节流(代码)实现,因为它不是最佳的。请在评论中通知我起因。大多数速率限制器应用相似于 enter()的 API。

外围性能
为了构建速率限制器的外围,咱们须要确保在任意两秒之间不容许超过 N 个事务。咱们将如何做到这一点?
思考咱们进行第一笔交易的时刻 t0。t0 . 所以,
直到(t0 + 1)s,咱们只容许进行 N 次交易。(t0 + 1)s , we are allowed to make only N transactions. 如何确保这一点?在下次交易时,咱们将查看
以后工夫≤(t0 + 1)。. 如果没有,那么这意味着咱们进入了不同的秒,并且咱们被容许进行 N 次交易。N transactions. 让咱们看一小段代码,它演示了:
long now = System.nanoTime();
if (now <= mNextSecondBoundary) {// If we are within the time limit of current second

if (mCounter < N) { // If token available
    mLastExecutionNanos = now;
    mCounter++; // Allocate token
    invoke(code); // Invoke the code passed the throttle method.
}

}
复制代码
那么,咱们如何定义 mNextSecondBoundary 呢?这将在咱们进行第一个事务时实现,如前所述,咱们将在实现第一个事务的时刻减少一秒。
if (mLastExecutionNanos == 0L) {

mCounter++; // Allocate the very first token here.
mLastExecutionNanos = System.nanoTime();
mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;  // (10^9)

}
复制代码
当初,如果咱们执行代码并看到咱们进入了不同的秒,咱们应该怎么做?咱们将通过重置上次执行工夫、可用令牌数来加强后面的代码,并通过调用 节流阀()再一次。咱们的办法曾经晓得如何解决新的秒。
@Override
public boolean throttle(Code code) {

if (mTPS <= 0) {
    // We do not want anything to pass.
    return false;
}

synchronized (mLock) {

    if (mLastExecutionNanos == 0L) {
        mCounter++;
        mLastExecutionNanos = System.nanoTime();
        mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
        invoke(code);
        return true;
    } else {long now = System.nanoTime();
        if (now <= mNextSecondBoundary) {if (mCounter < mTPS) {
                mLastExecutionNanos = now;
                mCounter++;
                invoke(code);
                return true;
            } else {return false;}
        } else {
            // Reset the counter as we in a different second now.
            mCounter = 0;
            mLastExecutionNanos = 0L;
            mNextSecondBoundary = 0L;
            return throttle(code);
        }
    }
}

}
复制代码
在这个实现中,咱们能够传递须要节流的代码块,然而这个代码有一个问题。这将工作,但它会体现不佳。不举荐,但为什么呢?请在评论中通知我。
当初,能够应用雷同的构建块和 enter()构建第二个 API 了。咱们将应用雷同的逻辑,但咱们不会执行办法外部的代码块。相同,它将在调用 enter()之后执行,就像咱们执行状态治理一样。该办法的实现如下:
@Override
public boolean enter() {

if (mTPS == 0L) {return false;}

synchronized (mBoundaryLock) {

    if (mLastExecutionNanos == 0L) {mLastExecutionNanos = System.nanoTime();
        mCounter++;
        mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
        return true;
    } else {long now = System.nanoTime();
        if (now <= mNextSecondBoundary) {if (mCounter < mTPS) {
                mLastExecutionNanos = now;
                mCounter++;
                return true;
            } else return false;
        } else {
            // Reset the counter as we in a different second now.
            mCounter = 0;
            mLastExecutionNanos = 0L;
            mNextSecondBoundary = 0L;
            return enter();}
    }
}

}
复制代码
当初,咱们简略的速率限制器曾经能够应用了。您能够查看残缺的代码 这里。
后果
咱们将尝试创立一个可创立六个线程的驱动程序代码。每个线程尝试从 0 到 100 计数,提早为 50ms(能够设置为任何数字)。咱们将按如下形式启动咱们的限速器:
public static void main(String[] args) {

RateLimiter limiter = new SimpleTokenBucketRateLimiter(1);
Thread[] group = new Thread[6];
Runnable r = () -> {for (int i = 0; i < 100; i++) {
        try {Thread.sleep(50);
        } catch (InterruptedException e) {throw new RuntimeException(e);
        }
        if (limiter.enter()) {System.out.println("Values:-" + Thread.currentThread().getName() + ":" + i);
        }
    }
};

for (int i = 0; i < 6; i++) {

    group[i] = new Thread(r);
    group[i].start();}

}
复制代码
咱们的 API 不反对平滑事务,而是让事务期待下一个令牌被调配,而不是抛弃申请。在回绝它们之后,它返回 false,所以如果咱们真的想的话,咱们能够把它们排队。
if (limiter.enter()) {

            System.out.println("Values:-" + Thread.currentThread().getName() + ":" + i);

} else {// queue the work again}
复制代码

这是 TPS 设置为 1 时的输入。
当咱们尝试将 TPS 设置为 2 咱们将看到以下输入:

真管用!
从 Android 的角度看

思考这样一种状况:您正在编写代码以捕捉用户签名。当他们拖动指针时,您会捕捉数千个点。平滑签名可能不须要所有这些参数,因而您应用速率限度进行采样。
一些事件调用频率很高。你能管制的。
咱们有 MessageQueue 的闲暇侦听器。当咱们在主线程中侦听它时,它被随便调用。有时候,它在一秒钟内被调用好几次。如果咱们想构建一个心跳零碎来通知咱们主线程何时闲暇,咱们能够应用它来接管每秒的事件。如果咱们一秒钟内没有收到事件,咱们能够假设主线程处于繁忙状态。
对于您的框架 / 库的 API 配额治理,您能够依据用户抉择的付款打算状况 API 调用。

明天先到这里吧。
咱们将在后续文章中构建一个更简单的速率限制器。

正文完
 0