@ambassify/queue
v1.1.8
Published
Queue implementation for node
Downloads
147
Readme
Queue
This library acts as a wrapper around different queue implementations that we might end up using.
Currently implemented backends: SQS
API
The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:
Queue.create( QueueType : class, ...args )
args is passed to the constructor of theQueueType
.constructor( queueName : string, options : object )
- Available options are:
itemPoolSize : int
getName() : string
getItemPool() : ItemPool
( see item-pool.js )receive( count : int ) : Promise
Attempt to receive at mostcount
itemsrelease( item : object, handled : boolean ) : Promise
Release the item, if nothandled
the item will not be deleted from the queue.handled
defaults tofalse
.touch( item : object, options : object ) : Promise
touch / ping a message to keep it in use.send( body : object ) : Promise
submit a new queue itemconnect() : Promise
start() : Promise
Start watching the queue for new itemsstop() : void
Stop watching the queue for new items, a final batch might still arrive after callingstop()
lock( item : object, options: object ) : Promise
Prevents a message from re-entering the queue.unlock( item : object ) : Promise
Release an earlier acquired lock.on( event : string, callback : function ) : void
Attach an eventhandler to the queue.message
event is triggered for each queue item that arrives.error
event is triggered for errors in the_eventLoop
or_lock
.
The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:
_fetch( itemsToFetch : int ) : Promise
RequestitemsToFetch
items from the queue. Do not perform any mutations on the raw object before resolving them._transform( item : object ) : object
This method will receive the items retrieved using_fetch
one by one, you can return altered objects from this method to change the queue items._delete( item : object ) : Promise
Remove theitem
from the queue / mark as finished. This method should always receive the instance from the_transform
step, such that you could add hidden fields to identify the item._touch( item : object, options: object ) : Promise
Touch the message to keep it from becoming visible again._send( item : object ) : Promise
Additem
to the queue._connect() : Promise
Start to connect with the backend._lock() : Promise
Prevents a message from re-entering the queue. Default implementation usesqueue.touch
._unlock() : Promise
Releases the lock and allows the item to re-enter the queue.
Libraries
- BatchOperation Utility to batch
batchSize
items unlesstimeout
expires. The SQS implementation uses this to batchdelete
andsend
operations. - ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
- sleep Returns a promise that resolves after a timeout.
Runtime configuration options
Configuration can be done through environment variables, options are:
BATCH_SIZE
defaults to10
QUEUE_POOL_SIZE
defaults to20
SQS_AWS_REGION
defaults toAWS_REGION
environment variable.SQS_FETCH_WAIT
defaults to20
seconds