npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@appolo/queue

v8.0.2

Published

appolo queue module

Downloads

49

Readme

Appolo Queue

queue module for appolo built with appolo-queue

Installation

npm i @appolo/queue

Options

| key | Description | Type | Default | --- | --- | --- | --- | | id | queue injection id | string| queue| | config | queue options | object | {} |

Queue options

| key | Description | Type | Default | --- | --- | --- | --- | | redis | redis connection string | string| ``| | queueName | queue prefix in redis | string | appolo-queue | | checkInterval | queue check time in ms | number | 1000 | | maxConcurrency | max number of jobs to process in parallel per server | number | 1 | | lockTime | interval in ms of how long the job stays locked when pulled from the queue | number | 60000 |

in config/modules/all.ts

import {QueueModule} from '@appolo/queue';

export = async function (app: App) {
    await app.module(new QueueModule({
        config:{
            redis:"redis://redis-connection-string",
            queueName:"myQueue"
        }
    }));
}

Usage

import {define, singleton,inject} from 'appolo'
import {Queue} from "@appolo/queue";

@define()
@singleton()
export class SomeManager {
    
    @inject() queue:Queue;
    
    async  createTask(data: any): Promise<any> {
         await this.queue.create("test", data).exec();
    }
}

In task handler the name of the handler must be the same as queue.create

import {define, inject} from 'appolo'
import {Job,handler} from "@appolo/queue";

@define()
export class SomeManager {
    
    @handler("test")
    async  handle(job: Job): Promise<any> {
        //let to some thing with job.params
    }
}

Create Job

create

create(jobId, [params]):Job

creates new job instance params optional object pasted to the handler in the end call exec to save the job the task will be run Date.now() each job must have uniq id

    queue.handle("test", async (job)=>{
        console.log(job.params.param)
    });

   await queue.create("test", {param: "value"})
         .exec();

delay

delay(time):Job

create delayed job where time one of the following the job will run only once

  • interval in milisec
  • date object
  • string in date syntax
    let job = await queue.create("test", {param: "value"})
        .delay(2000)
        .exec();

    await queue.create("test", {param: "value"})
        .delay("in 5 hours")
        .exec();

schedule

schedule(time):Job

create scheduled job where time one of the following the job will run every interval

  • interval in milisec
  • cron job syntax
  • date object
  • string in date syntax
    let job = await queue.create("test", {param: "value"})
        .scedule(10 * 60 * 1000)
        .exec();

    await queue.create("test", {param: "value"})
        .delay("every 10 minutes")
        .exec();

    await queue.create("test", {param: "value"})
            .delay("* */10 * * * *")
            .exec();

Handle jobs

Each job has it's own handler The handler will be called with the job instance and return promise with the job result

handle

handle(jobId,handler,[options]):Queue

adds handler to queue by job id options:

  • lockTime: interval in milisec lock the job while the handler is running
    queue.handle("test", async (job)=>{
        console.log(job.params.param)

        let result = await  doSomething(ob.params.someValue)

        return result
    });

   await queue.create("test", {someValue: "value"})
         .exec();

handler with multiple jobs

you can define one handler to handle multi jobs

    queue.handle("test", async (job)=>{
       //do something
    });

    await queue.create("someId", {someValue: "value"})
         .handler("test")
         .exec();

    await queue.create("someId2", {someValue: "value"})
        .handler("test")
        .exec();

Job

await queue.create("someId2", {someValue: "value"})
    .handler("test")
    .lockTime(10*60*1000) // 10 min
    .repeat(2)
    .retry(3)
    .backoff(2000)
    .exec();

lockTime

lockTime(lockTime: number):Job

change job lock time default: 60000

repeat

repeat(value: number):Job

set the max number of time job will run the default fro schedule in unlimited and for delayed is 1

retry

retry (value: number):Job

set the number of retries on job fail default :10 when the number is reached will reschedule the job

backoff

backoff(value: number) :Job

set interval in milisec for each retry backoff default:1000

handler

handler(value: string | Function) :Job

set job handler id or function

exec

exec() :Promise<Job>

save the job to redis if the schedule changed the job will reschedule

lock

lock(interval:number) :Promise<Job>

lock job for given interval this method is called automatically when the handler is called


await queue.create("test", {someValue: "value"})

queue.handle("test",async (job)=>{
    await job.lock(60 *1000);
    //do something
})

run

run(waitForResults:boolean) :Promise<Job> | Promise<any>

save the job to redis and run it immediately if waitForResults then promise returned with job result


queue.handle("test",async (job)=>{
    return "some value"
})

let job =  await queue.create("test", {someValue: "value"}).run()

let result =  await queue.create("test", {someValue: "value"}).run(true)

 consloe.log(result) //some value

cancel

cancel() :Promise<void>

cancel the job and delete from redis

id

get id():string

return job id

params

get params():any

return job params

nextRun

get nextRun(): number

return job next run unix milisec

interval

get interval(): number

return job next run interval milisec

options

get options()

return job options

Job Events

Job events are fired on the Job instances via Redis pubsub all callbacks called with the job instance

  • Events.JobStart the job is pulled from the queue and the handler is called
  • Events.JobSuccess job run is completed successfully result is added the callback args
  • Events.JobFail job run is failed with error
  • Events.JobComplete job run is success or failed and the job is returned to the queue
let job = await queue.create("someId2", {someValue: "value"})
    .handler("test")
    .once(Events.JobSuccess,(job,result)=>console.log('weeeee'))
    .exec();

on

on(eventName,callback, [scope]):Job

register event listener

once

once(eventName,callback, [scope]):Job

register event listener, will be removed after one call

un

un(eventName,callback, [scope]):Job

remove event listener

Queue

initialize

initialize():Promise<void>

initialize the queue and start pulling interval a promise returned when every thing is ready.

start

start()

start pulling jobs from the queue

stop

stop()

stop pulling jobs from the queue

run

run(jobId: string,waitForResult:boolean): Promise<this | any>

run job by id return the instance or job result when waitForResult true

getJob

getJob(id: string): Promise<Job>

get job instance by id

getAllJobs

getAllJobs(): Promise<Job[]>

get all jobs in the queue

hasJob

hasJob(id: string): Promise<boolean>

return true if job id exist in the queue

purge

purge()

delete all jobs in the queue

reset

reset()

stop job pulling and purge the queue

Queue Events

Job events are fired on the Job instances via Redis pubsub all callbacks called with the job instance

  • Events.JobStart - the job is pulled from the queue and the handler is called
  • Events.JobSuccess - job run is completed successfully result is added the callback args
  • Events.JobFail job - run is failed with error
  • Events.JobComplete - job run is success or failed and the job is returned to the queue
  • Events.Ready - the queue finish initialize and start pull interval
  • Events.Error - some error occurred during the job process
await queue.create("someIs", {someValue: "value"})
    .exec();

queue.once(Events.JobSuccess,(job,result)=>console.log(job.id))

queue.on(Events.Error,(e)=>console.log(e))

on

on(eventName,callback, [scope]):Queue

register event listener

once

once(eventName,callback, [scope]):Queue

register event listener, will be removed after one call

un

un(eventName,callback, [scope]):Queue

remove event listener