firequeue
v2.2.11
Published
task queue for node and browser backed by firebase
Downloads
5
Readme
firequeue
api has been completly redesigned. All the methods returns streams that you can pipe, combine, fork .. to accomodate your needs
Platform Compatibility
the queue engine requires node 0.11.x or greater, and --harmony flag to to get access to generators
Creating jobs however, is as simple as pushing job payload to the queue incoming node
ui
checkout https://github.com/jogabo/firequeue-ui (not ported to v1 yet)
Examples
// run ./example.js
import firequeue from 'firequeue'
import { concat, throughSync } from 'stream-util'
import parallel from 'concurrent-transform'
const queue = firequeue.init('https://firequeue-test.firebaseio.com')
const logger = (fn) => throughSync(function(data) {
console.log(fn(data))
this.push(data)
})
// create some jobs
queue.jobs.push({ task: 'task1', data: { name: 'job1' } })
queue.jobs.push({ task: 'task1', data: { name: 'job2' } })
queue.jobs.push({ task: 'task1', data: { name: 'job3' }, delayed: '2s' })
queue.jobs.push({ task: 'task2', data: { name: 'job4' } })
// create job and listen to job updates
queue.jobs
.push({ task: 'task3', data: { name: 'job5' } })
.child('state').on('value', (s) => console.log(`job changed state to ${s.val()}`))
// log 'job changed state to queued'
// log 'job changed state to activated'
// log 'job changed state to completed'
// start queue engine
queue
.start()
.pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
// log task: task1, job: job1, state: queued
// log task: task1, job: job2, state: queued
// log task: task1, job: job3, state: delayed
// ...
// process task1
const task1 = queue
.read('task1')
.pipe(queue.process((job) => {
// do some work with job.key(), job.val()
return Promise.resolve()
}))
.pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
// log task: task1, job: job1, state: completed
// log task: task1, job: job2, state: completed
// ...
// process task2 with maxAttempts and backoff
const task2 = queue
.read('task2')
.pipe(queue.maxAttempts(2))
.pipe(queue.backoff('2s')) // wait 2s before retrying
.pipe(queue.process((job) => {
console.log('do some work with', job.key(), job.val())
const attempts = job.child('attempts').val() || 0
return attempts < 2
? Promise.reject()
: Promise.resolve()
}))
// process task3 with a concurrency of 10
const task3 = queue
.read('task3')
.pipe(parallel(queue.process((job) => {
console.log('do some work with', job.key(), job.val())
return Promise.resolve()
}), 10))
// remove completed jobs
concat(task1, task2, task3)
.pipe(queue.clean('completed'))
.pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
// log task: task1, job: job1, state: cleaned
// log task: task1, job: job2, state: cleaned
// ...
// remove failed jobs after 1 day
queue
.readJobsByStateWithDelay('failed', '1d')
.on('data', (snap) => snap.ref().remove())
// 30sec later...
setTimeout(() => {
queue.stop().then(() => {
console.log('queue was stopped successfuly')
})
}, 30000)
Api
init(ref: Firebase|String)
create a new queue at this firebase location
start()
Start the queue return a stream of object { task, state, key } object
stop()
Stop the queue, return a Promise that resolved when all the currently active jobs have run.
read(taskName: String)
create readable stream of taskName
jobs
process(fn|fn*)
create a transform job that process a task fn should return a yieldable object (https://github.com/tj/co#yieldables)
maxAttempts(n: Int)
create a transform stream that define the number of attempts before a job is marked as failed
backoff(time: String|Date)
create a transform stream that define the time to wait before retrying a failed job
clean(fn|string)
create a transform that remove job that match the filter
Firebase structure
The firebase backend is structured as follow:
-jobs (list of all the jobs / priority = -job.createdAt / indexedOn state)
|-job1: {}
|-job2: {}
|-job3: {}
|-job4: {}
-tasks (list of queued jobs reference by task)
|- task-name (priority = job.priority)
|- job1: true
|- job2: true
-delayed (list jobs events)
|- job3: true
|- job4: true
job structure
- data (Object) recommanded path to store job payload
- attempts (Number) number of failed attempts
- createdAt (Timestamp) creation timestamp
- queuedAt (Timestamp) queued timestamp
- completedAt (Timestamp) completed timestamp
- failedAt (Timestamp) failed timestamp