atlas-throttled-queue
v1.0.0
Published
Async job queue that limits the rate of job execution.
Downloads
4
Maintainers
Readme
atlas-throttled-queue
Async job queue that limits the rate of job execution.
install
npm install --save atlas-throttled-queue
why
I was writing a totally legal web-scraper and I didn't want to overburden my sources with a bunch of spam. This package exports a simple queue which takes a period, τ, and lets you run arbitrary jobs at a rate of 1/τ.
examples
web scraper
This example is simplified for the sake of brevity. Let's say we have a service which takes a document, gets the keywords, and then performs a Bing search of all the keywords, obtaining the top link for each keyword. We might want to do this if we want to "learn more" about the document -- maybe we recursively scrape the links, feed the new set of documents to a machine learning algorithm so it can "learn more" about the original document, then look up links for those documents, etc.
For this example, let's assume we have the following functions:
getBingResults
: looks up a phrase on bing, parses the output, and returns a list of links.getKeywords
: Filters out the stop-words from a document, returning an set of important words.upsertLink
: Upserts an obtained link to our database.
const { readFileSync } = require("fs");
const ThrottledQueue = require("atlas-throttled-queue");
const getKeywords = require("./filter-stop-words");
const getBingResults = require("./bing-search-client");
const upsertLink = require("./upsert-link")
// make a queue
const tau = 500;
const queue = new ThrottledQueue(tau);
// get our keywords
const doc = readFileSync("./document.txt");
const keywords = getKeywords(doc);
// run our throttled scraper, keywords.length === 20000
for (let i = keywords.length; i--;){
queue.push(() => {
getBingResults(keywords[i], links => {
const topLink = links[0];
upsertLink(topLink, () => {
// no-op, don't care about result of write
})
})
})
}
In the example above, we have 20,000 search jobs, but they are run every τ milliseconds. This helps keep us under the radar and prevents us from overloading our system. Note that the throttler does not enforce any rules regarding concurrency: if each search job takes one second, then ~2 search jobs will be running at any given time. You can place queue.push
calls inside of jobs for a concurrent queue to limit the concurrency of your jobs, in addition to limiting the rate at which they are fired.
todo
dynamic τ
It might be interesting to implement a dynamic τ that can react to changes in API allowance. For example, we might want to slow down our jobs if we notice rate limiting headers getting close their limit. As of right now, I don't need the feature, but implementing it would not be difficult.
caveats
capturing errors and data
There's no way to capture errors or results, this queue is only for controlling flow. If you need to capture errors or results, do it at the scope you're writing your jobs in.
done
callback
Unlike atlas-concurrent-queue, there's no callback we can call when "our queue is done". I don't think it makes a ton of sense to have a callback for this, since there isn't a well-defined moment when our throttler is finished:
- Do we capture the end of each job, calling
done
when all jobs have returned? - Do we call
done
when the queue has been exausted and all jobs have been executed?
It doesn't seem very well-defined, but I could be wrong and I'd be open to ideas.