How to build a rate-limiter in JavaScript or Node.js

In this post we show how to build a rate-limiter in JavaScript or Node.js. A rate-limiter as its name suggests is used to limit the amount of traffic being sent to a server. Think of it as a dam that collects water and slowly releases it to a city or area.

Let’s get started. There are two things we need to keep track of in a rate-limiter and these are:

  • the number of inactive (or latent) requests, and
  • the number of active (or concurrent) requests

Inactive request is defined as a request that is waiting to be sent to the receiver. We will also call these as pending or latent requests. Active (or concurrent) request is defined as a request that has been sent to the receiver but we are still waiting for it to be processed. We know when a request has been processed (sucessfully or unsuccessfuly) when we hear back from the receiver. We will also call these as in-flight requests similar to a flight that has taken off but not landed. We will also use the term message for a request interchangeably.

The basic design of a rate-limiter is like this then: we will define a constructor and just one method:

class RateLimiter extends EventEmitter {
    constructor(config);
    enqueueRequest(request);
}

The reason for EventEmitter will become clearer in a moment. The constructor takes input a config object which defines:

  • url of the receiver to which requests should be sent
  • maxConcurrentRequests which defines an upper bound on the number of requests that can be concurrent at any given time. This is a positive number and must be greater than 0. A value of 1 corresponds to complete serialization of requests whereby the rate-limiter sends one request to receiver, waits for it to be processed and then sends the second request and so on.

When a request is enqueued by calling enqueueRequest, the rate-limiter will immediately add it to an internal FIFO queue of inactive requests. The rate-limiter will check the number of currently active requests and if that is less than the maximum allowed it will dispatch the first message in the queue. The dispatch logic can be encapsulated in a private #dispatch method:

#dispatch();

This method should:

  1. throw an error if the queue is empty
  2. dequeue a request from the queue
  3. tag it with an auto-generated id (we can use nanoid library for this)
  4. add the request to an internal dictionary that keeps track of the active requests
  5. send it to the receiver
  6. Add event handler that is executed when the request has been processed. This event handler will remove the request from the dictionary. This is where we use the id generated in step 2.

The event handler needs to do more than just the above: it should again check if the queue is not empty and if the number of active requests have dropped below the maximum allowed and if so, dispatch the next request to the receiver.

So this is the basic design. To implement it, we will define a few basic data structures internal to the RateLimiter class:

#pendingMessages = [];  // messages waiting to be sent. the inactive requests.
#inFlightMessages = {}; // messages that have been sent but not processed. the active requests.

We will define an idle state as the state when the number of active requests < maximum allowed. Note that strictly speaking, the rate-limiter is not really idle when the number of active requests < maximum allowed. To someone, an idle state might mean the number of active requests = 0 but we use the term loosely.

We can start with following constructor:

class RateLimiter extends EventEmitter {
    constructor(config) {
        this.#pendingMessages = [];
        this.#inFlightMessages = {};   
        this.#maxInFlightMessages = config.maxConcurrentRequests;
        
        // more code to populate this.#url from config ...
    }
}

Let’s look at the enqueueRequest method:

class RateLimiter extends EventEmitter {
    ...
    enqueueRequest = (message) => {
        // add message to FIFO queue of inactive requests
        this.#pendingMessages.push(message);
        // dispatch the message if we have bandwidth to do so
        if (this.#isIdle()) {
            this.#dispatch();
        }
    }
}

The #isIdle method is nothing but:

#isIdle = () => {
    // check we have not maxed out our quota
    return Object.keys(this.#inFlightMessages).length < this.#maxInFlightMessages;
}

Let’s now come to the #dispatch method which will be a lengthy one:

#dispatch = (msg) => {
    if (this.#pendingMessages.length === 0) {
        throw new Error("cannot post message when buffer is empty. This should NOT happen.")
    }
    const id = nanoid();        
    var msg = this.#pendingMessages.shift(); // dequeue operator
    
                
        try {
            // here we are sending the message as JSON object in the body of a POST request
            // change this code according to your needs
            const postData = JSON.stringify(msg);
            const options = {
                hostname: this.#url.hostname,
                port: this.#url.port,
                path: this.#url.path,
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Content-Length': Buffer.byteLength(postData),
                },
            };
            
            const req = http.request(options, (res) => {
                logger.debug(`STATUS: ${res.statusCode}`);
                logger.debug(`HEADERS: ${JSON.stringify(res.headers)}`);
                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                logger.debug(`BODY: ${chunk}`);
                });
                res.on('end', () => {
                    this.#removeInFlightMessage(id);
                    this.emit('complete', msg);
                });
            });
            
            req.on('error', (e) => {                    
                this.#removeInFlightMessage(id);
                this.emit('error', { message:msg, error:e });
            });
            this.#inFlightMessages[id] = msg;
            // Write data to request body
            req.write(postData);
            req.end();
            this.emit('dispatch', msg);
        } catch (e) {
            this.#removeInFlightMessage(id);
            this.emit('error', { message:msg, error:e });
        } 
    
}  

the code is long and I encourage to study it but it is basically doing the steps outlined earlier. We are making use of nanoid library to generate a unique id or token for each message and logger is reference to a logger (I use winston). There are three important events we raise in the dispatch method to tell consumers of the rate-limiter what’s going on behind the scenes:

  • dispatch event is raised whenever a message is dispatched to the receiver
  • complete event is raised when we get a successful acknowledgement from the receiver
  • error event is raised whenever there is an error. Note that the consumer of this class MUST add an event handler to handle the error event. If they do not do so, an unhandled error event will terminate the Node.js process. Refer this.

The #removeInFlightMessage looks like so:

#removeInFlightMessage = (id) => {
    delete this.#inFlightMessages[id];
    if (this.#isIdle()) {
        this.#handleIsIdleEvent();
    }
}

As mentioned earlier removing the message from the dictionary of active requests is not enough. We also need to check if we have the bandwidth to dispatch another message from the FIFO queue. If we don’t do this crucial step, the messages will never get sent subsequently. To do this we check if the rate-limiter has become idle and if so handle that event appropriately as shown below:

#handleIsIdleEvent = () => {
    // take the first message in the buffer and send it to the downstream receiver
    if (this.#pendingMessages.length > 0) {
        this.#dispatch();
    }
}

And that gives us the basic rate-limiter. The key to getting it right is to recognize we need two data-structures: the FIFO queue of inactive requests and a dictionary of active requests.

Making it better: adding metrics

Wouldn’t it be nice if we can keep track of the rps – requests per second and the average time it takes to process a request by the receiver? Let’s see how to do it. To make it possible, we are going to define some additional variables for book-keeping:

  • countOfSuccessfulRequests is a variable that keeps track of the count of successful requests. As a design choice we only consider successful requests in calculation of the metrics. You could track unsuccessful requests and their associated metrics separately if you like.
  • wallClockFlightTimeInSeconds is a variable that will track the amount of time for which the rate-limiter is processing one or more active requests i.e., it is the period of time during which number of active requests > 0.
  • cumulativeTimeInSeconds will add up the response time of each individual request.
  • stopwatch is an object we will define that will function like a real-world stopwatch using which we can measure time.
constructor(config) {
    ...
    this.#countOfSuccessfulRequests = 0;
    this.#wallClockFlightTimeInSeconds = 0;
    this.#cumulativeTimeInSeconds = 0;
    this.#stopwatch = new Stopwatch();    
}

Stopwatch is loosely modelled after the C# Stopwatch class and has following methods:

class Stopwatch {
    start();
    stop();
    elapsedTimeInSeconds();
    isTicking();
    reset();
}

Stopwatch has to be started first of all by calling the start method. This puts the stopwatch in the ticking state and it starts measuring time. To get out a reading we must first stop the staopwatch by calling stop and then we can get a reading of the elapsed time by calling elapsedTimeInSeconds. To use the stopwatch again for a new measurement we must reset it first by calling reset followed by startstopelapsed cycle again. I will leave implementation of this as an exercise.

Calculating RPS – requests per second

First, let us focus on the work we need to do to measure rps and we will address the problem of calculating the mean response time later. The rps will be given by this.#countOfSuccessfulRequests / this.#wallClockFlightTimeInSeconds so we just need to keep track of these two variables appropriately. To that effect, whenever #dispatch is called, we need to start the stopwatch only if its not already running before sending the request to the receiver. This is done as follows:

#dispatch = async (msg) => {
    ...
    if (!this.#stopwatch.isTicking()) {
        this.#stopwatch.start();
    }
    // Write data to request body
    req.write(postData);
}

The necessity of the if condition will become clearer as you read through the whole code. Next, we need to increment the count of successful requests when a request completes successfully. This is done in the dispatcher code:

res.on('end', () => {
    ...
    this.#countOfSuccessfulRequests++;  // count of SUCCESSFUL requests
}

After this, we will make modifications to the isIdle event handler so it will accumulate the wallClock time and crucially, it has to re-start the stopwatch if the number of active requests is not zero. This is done as follows:

#handleIsIdleEvent = () => {
    assert.ok(this.#stopwatch.isTicking());        
    this.#stopwatch.stop();
    this.#wallClockFlightTimeInSeconds += this.#stopwatch.elapsedTimeInSeconds();
    this.#stopwatch.reset();
    if (this.#numberOfActiveRequests() > 0) {
        this.#stopwatch.start();
    }
    ...
}

That’s it. We can now define a calcRps method:

#calcRps = () => {
    let rps = 0;
    if (this.#wallClockFlightTimeInSeconds > 0) {        
        rps = this.#countOfSuccessfulRequests / this.#wallClockFlightTimeInSeconds;
    }
    logger.info(`rps = ${rps}`);
}

The method can be called anywhere you like in the code. I call it whenever a request completes successfully i.e., in the same event handler that increments the count of successful requests.

Calculating the mean response time

I found this to be actually easier than calculating rps. To do this I defined another class very similar to Stopwatch but that I call Flight. It has following methods that mimic an airline flight that takes off, lands and you can measure the time in flight:

class Flight {
    takeOff();
    land();
    elapsedTimeInSeconds();
}

The implementation of this is left as exercise to the reader. So now in the dispatch method, we simply create a flight for each request that takes off when the request is sent and lands when the request successfully completes. We have already been keeping track of the count of successful requests when we implemented the rps metric in previous section so we can calculate the mean response time as this.#cumulativeTimeInSeconds / this.#countOfSuccessfulRequests. All we need to do is keep track of #cumulativeTimeInSeconds which is done in the same event handler that increments count of successful requests like so:

res.on('end', () => {
    flight.land();
    this.#countOfSuccessfulRequests++;
    this.#cumulativeTimeInSeconds += flight.flightTimeInSeconds();
    this.#removeInFlightMessage(id);
    this.#calcRps();
    this.emit('complete', msg);
});

The calcRps method now displays both rps and mean response time like below:

#calcRps = () => {
    let rps = 0;
    let meanResponseTime = 0;
    if (this.#wallClockFlightTimeInSeconds > 0) {        
        rps = this.#countOfSuccessfulRequests / this.#wallClockFlightTimeInSeconds;
    }
    if (this.#countOfSuccessfulRequests > 0) {
        meanResponseTime = this.#cumulativeTimeInSeconds / this.#countOfSuccessfulRequests;
    }
    logger.info(`rps = ${rps}, mean response time = ${meanResponseTime}s`);
}

and with this we have a fully instrumented rate-limiter! Putting the pieces together is left as exercise for the reader. I do not give out link to any GitHub repo.

This entry was posted in Computers, programming, Software and tagged , . Bookmark the permalink.

Leave a comment