npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

metatasks

v1.1.1

Published

FIFO queue for Promise-based tasks. Optionally persistent to ensure tasks survive reboots. Supports concurrency-limiting, metadata and state-tracking for statistics.

Downloads

25

Readme

Metatasks

Metatasks is a FIFO queue for Promise-based tasks. Optionally persistent to ensure potentially long, fault-prone running tasks survive reboots. Supports concurrency-limiting, metadata and state-tracking for statistics or debugging. Makes use of ES2015 decorators and optionally the new ES6 Proxy object.

Installation

$ npm install metatasks --save

This library supports ES7 decorators proposal which is supported by babel and typescript. To use it with babel you should enable experimental es7.decorators feature in babel as described here. To use it with typescript you should enable experimentalDecorators and emitDecoratorMetadata in tsconfig.json

Usage

The simplest way to use metatasks queue system is to decorate an asynchronous method (one which returns a Promise).

import {useQueue} from 'metatasks';

class Example {
  @useQueue(3)
  async exampleMethod(retVal) {
    return retVal;
  }
}

Now, however many times you invoke the exampleMethod, only 3 instances will be running at the same time. You may still await the result, as the Promise will pass it eventually, when it's the task's turn to start and complete.

This is great if you want to queue all the calls to the method using the same queue. If you'd like to make queueing optional, you can use the experimental method proxying described below or generate a queue manually.

Persistence

Persistence is achieved through an interface class of your design, thus any database can be used for task storage.

class Example {
  @useQueue({
      concurrency: 3,
      queueId: 'storedExample',
      injectTask: true,
      taskNameResolver: function(someParam) { return `${this.somethingFromThisObject} : ${someParam}` },
      persistence: {
        interface: PersistentTaskDatabase,
        thisRestorer: async function (objectStoredInTheDatabase) { return new Example(objectStoredInTheDatabase) },
        thisStorer: async function (target) { return { ... }}
      }
  })
  async exampleMethod(someParam, task) {
    return someParam;
  }
}

Persistence interface

The interface must be a class that implements static methods as in the following example:

export default class PersistentTaskDatabaseExample {
  static async taskEnqueue(queueId, taskJson:Object) {
  }

  static async taskSetStarted(queueId, taskId, metadata) {
  }

  static async taskSetCompleted(queueId, taskId, value, metadata) {
  }

  static async taskSetFailed(queueId, taskId, error, metadata) {
  }

  static async taskSetMetadata(queueId, taskId, key, value) {
  }
  
  static async taskMergeMetadata(queueId, taskId, obj) {
  }

  static async setAllStartedAsInterrupted(queueId) {
  }

  static async getTasksToQueue(queueId, includeInterrupted = true):Array<Object> {
    return [];
  }
}

Available configuration options for useQueue

  • concurrency: how many instances of this method are allowed to run simultaneously
  • queueId: the unique identifier for this queue, can be used to access the queue from the script and by the database to identify the stored tasks
    defaults to: automatically generated name from filename, class and method name
  • injectTask: if true, will make it possible to access the task from the method as the last parameter (note: when this is enabled, be careful to always pass the right amount of parameters to the method, even when they are 'undefined')
    defaults to: false
  • taskNameResolver: a method returning a String with the name of the task, which makes it possible to identify the instance of the task; invoked when the task is enqueued with the same parameters as the actual method and on the same target object (can access this)
    defaults to: name automatically generated from the class name, method name and a sequential number (or random when using persistence)
  • metadataResolver: a method returning an Object like { name: 'taskName', ... }, similar to above, if you want to store more information about the given task to the database
    when used: overrides taskNameResolver and requires 'name' property to be present
  • persistence: an object containing:
    • interface: prototype of a class, implementing static async methods that store or retrieve information about tasks in the database
    • thisStorer: a method that converts the target object - execution context of the task (this) - into an object that can be stored in the database
    • thisRestorer: a method that recreates the context (this), reversing the above
  • queueModifier: a function that is invoked on the immediately after the queue is created, before any tasks are added
    mostly useful for adding additional callbacks for queue events

Example queueModifier

function(queue) {
  // ...
  queue.on('started', onStarted);
  queue.on('finished', onFinished);
  queue.on('drained', onDrained);
  queue.on('added', onAdded);
}

Method proxying

Method proxying will only work under V8 if you run your application with the --harmony_proxies flag (as of October 2015, subject to change).

import {enableQueuedCalls} from 'metatasks';

@enableQueuedCalls
class Example {
  async exampleMethod(retVal) {
    ...
    return retVal;
  }
}

let example = new Example();
example.useQueue({concurrency: 1, queueId: 'my-queue'}).exampleMethod(123);

Adding metadata during task execution

This is useful for creating super-tasks, long-running tasks that embed other tasks and continue execution from a certain point, instead of starting over from beginning in case of a system failure.

class Example {
  @useQueue({
      concurrency: 3,
      injectTask: true,
      persistence: { ... }
  })
  async exampleMethod(someParam, task) {
    if (task.metadata.status != 'part-1-completed') {
      // doing something...
      // finished part 1 of task
      await task.setMetadata('status', 'part-1-completed');
    }
    // no need to redo what was already done...
    // ...
    return someParam;
  }
}

Accessing queues in memory and the states of their tasks

import {Queues} from 'metatasks';

let queueId = 'storedExample';
let queue = Queues.get(queueId);

Available properties and actions on a TaskQueue:

  • queue.whenDrained - a Promise that resolves once all tasks have finished running and their states were synced with the persistent database
  • queue.empty - Boolean, true when empty
  • queue.queue - Array<TaskInstance> waiting to be started
  • queue.runningTasks - Map<processID, TaskInstance>
  • queue.pause() - pause the queue (won't interrupt tasks which are already running)
  • queue.start() - resume the queue
  • queue.scheduleTask(metadata:Object|string, target:Object, ...params:Array)
  • queue.asyncEventPromises - can be used to ensure all the up-to-date information is stored in the database, e.g. used when doing a graceful shutdown queue.pause(); await Promise.all(queue.asyncEventPromises)
  • queue.gracefulShutdown() - halts the system while making sure all tasks have their states synced
  • queue.happyShutdown() - halts the system while making sure all running tasks complete and have their states synced
  • queue can be iterated with for-of to go through all currently running and queued tasks
  • queue is an EventEmitter and emits the following events:
    • added => function(task)
    • started => function(task)
    • finished => function(task)
    • drained => function(tasks[])

Available properties and actions on a TaskInstance:

  • task.metadata - the object that contains tasks metadata
  • task.setMetadata(key, value) - returns Promise
  • task.mergeMetadata(obj) - returns Promise
  • task.promise - promise that is fulfilled once the task completes
  • task.generatedPromise - the actual promise of the method, once it is started
  • task.setCancelMethod(method) - to be used from inside of the task - pass a method that will cancel (fail) the task prematurely
  • task.name - returns the task's name

Creating a queue without using decorators

See this test for an example of how to do that.

Alternatives

I have not tested these, however they look like they could be used to achieve similar results.

  • https://www.npmjs.com/package/qtask
  • https://www.npmjs.com/package/schedule-drone