npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

wsq

v0.2.0

Published

Task queue on top of websockets

Downloads

201

Readme

wsq Build Status Coverage Status Package Version License

Websocket task queue - DEMO

What is it?

An easy to use task queue that handles streaming data, also works in the browser.

Example

Video encoding

server.js (see the wsq-server for a standalone server with logging)

var Server = require('wsq/server')
var leveldown = require('leveldown')
var BlobStore = require('fs-blob-store')

new WsqServer({
  socketOptions: {port: 4242},
  dbLocation: '/var/leveldb/wsq',
  dbOptions: {db: leveldown}, // db can be any 'abstract-leveldown' compatible instance
  blobStore: new BlobStore('/var/storage/wsq') // same here any 'abstract-blob-store' will do
})

add.js:

// usage: node add.js <videofile> <ffmpeg arguments>

var Client = require('wsq/client')
var fs = require('fs')

var client = new Client('ws://localhost:4242')
var queue = client.queue('ffmpeg')

var data = {
	video: fs.createReadStream(process.argv[2]),
	args: process.argv.slice(3)
}

var task = queue.add(data, function(error){
	if (error) {
		console.log('Error queueing video: ' + error.message)
		process.exit(1)
	} else {
		console.log('Video queued for processing.')
		process.exit()
	}
})

worker.js:

var Client = require('wsq/client')
var fs = require('fs')
var os = require('os')
var path = require('path')

var client = new Client('ws://localhost:4242')

var videoQueue = client.queue('ffmpeg')
var resultQueue = client.quueu('ffmpeg-results')

videoQueue.process(function(task, callback) {
	var encoder = new VideoEncoder(task.data.args)

	encoder.on('progress', function(percent) {
		// update task progress, this will also reset the task timeout (default 60 seconds)
		// useful for long running tasks like this one
		task.updateProgress(percent)
	})

	// start encoding
	task.data.video.pipe(encoder)

	// start streaming the encoded video to the result queue, if the stream emits an error
	// the result task will not be created and any partial data streamed is discarded
	resultQueue.add({video: encoder}, function(error){
		if (error) {
			console.log('Encoding failed: ' + error.message)
			callback(error) // task is marked as failed, and possibly re-queued based on its options
		} else {
			// all good, ready to accept next task
			callback()
		}
	})
})

Documentation

Class: Client

This class is a wsq client. It is an EventEmitter.

new Client(address, [options])

  • address String
  • options Object
    • backoff Function

Construct a new client object.

address

Address to wsq server, e.g. 'ws://localhost:1324'

options.backoff

Function with the signature function(tries){} that should return number of milliseconds to wait until next conneciton attempt.

The default funciton looks like:

function(tries){
	return Math.min(Math.pow(tries * 10, 2), 60 * 1000)
}

client.queue(name)

Return a ClientQueue instance. Will be created if nonexistent.

client.listQueues()

Return an array of active ClientQueue instances.

client.getEventStream()

Return a object stream that writes all the events as they come in from the server.

{
	"event": "<event name>",
	"args": [..]
}

Event: 'error'

function(error){}

Event: 'connect'

function(){}

Connected to server.

Event: 'disconnect'

function(){}

Connection was lost.

Class: ClientQueue

This class is the client's representation of a queue. It is an EventEmitter.

queue.add(data, [options], [callback])

Add a task to the queue. The optional callback is called when the task is successfully added queued or with an Error object on failure.

  • options Object
    • timeout Number - Default 60 * 1000
    • retries Number - Default 0
    • autoremove Boolean - Default false

options.timeout

How long to wait for the task to complete without hearing from the worker in milliseconds. Set to -1 to disable timeout (not recommended, use progress updates for long running tasks instead)

options.retries

How many times the task should be re-queued on failure. A value of zero means no retries before the task have to be re-queued or removed explicitly. Can also be set to -1 to retry forever.

options.autoremove

Wether to remove the task and any associated streams that where buffered on completion. Note that failed tasks will always have to be handled explicitly.

queue.process(workerFn)

Add a worker to the queue. workerFn has the signature function(task, callback){}.

The callback should be called when the worker has completed processing the task, or with an Error object on failure.

queue.all(callback)

Callback with a list of all Task instances in the queue.

queue.waiting(callback)

Callback with a list of all waiting Task instances in the queue.

queue.active(callback)

Callback with a list of all active Task instances in the queue.

queue.completed(callback)

Callback with a list of all completed Task instances in the queue.

queue.failed(callback)

Callback with a list of all failed Task instances in the queue.

Event: 'worker added'

function(worker){}

Worker was added to the queue.

Event: 'worker removed'

function(worker){}

Worker was removed from the queue.

Event: 'worker started'

function(worker, task){}

Worker started processing task.

Event: 'task <task_event>'

See Task events.

Class: Task

This class represents a task. It is an EventEmitter.

task.updateProgress(percentage)

Update the progress of the task. Percentage is a fraction between 0 and 1. Calling this resets the task timeout timer.

task.touch()

Reset the task timeout. Useful if your task process does not have any useful progress information but you still want to keep long living tasks running.

task.remove(callback)

Remove the task from the system. Do not call this from inside a worker.

task.retry(callback)

Reschedule a failed task. Do not call this from inside a worker.

task.getData(callback)

Return task data with streams resolved. Note that task.data will already be resolved for tasks passed to a worker.

Event: 'added'

function(task){}

Added to queue.

Event: 'queued'

function(task){}

Queued for processing.

Event: 'started'

function(task){}

Started processing.

Event: 'progress'

function(task, percentage){}

Progress updated. Percentage is a fraction between 0 and 1.

Event: 'completed'

function(task){}

Successfully processed.

Event: 'failed'

function(task, willRetry){}

Task failed, task.error will contain the failure message. willRetry is true if the task will be retried.

Event: 'deleted'

function(task){}

Task and associated streams was removed from the queue.

License

MIT