npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@requence/consumer

v0.1.0

Published

This package connects a JavaScript runtime (e.g. Node.js / Bun) to the operator bus. It manages the retrieval and responses of messages.

Downloads

11

Readme

@requence/consumer

This package connects a JavaScript runtime (e.g. Node.js / Bun) to the operator bus. It manages the retrieval and responses of messages.

Usage

import createConsumer from '@requence/consumer'

createConsumer((ctx) => {
  return 'this is my response'
})

Every consumer instance needs a url parameter to connect to the operator and a version. In the basic example, those parameters get automatically retrieved from environment variables REQUENCE_URL and VERSION. To be more explicit about those configurations, you can pass an object as the first parameter:

createConsumer(
  {
    url: 'your operator connection url string',
    version: 'your version in format major.minor.patch',
  },
  (ctx) => {
    return 'this is my response'
  },
)

Additional configuration

By default, the consumer retrieves one message from the operator, processes it and passes the response back to the operator. If your service is capable of processing multiple messages in parallel, you can define a higher prefetch.

createConsumer(
  {
    prefetch: 10, // this will process max. 10 messages at once when available
  },
  async (ctx) => {
    return 'this is my response'
  },
)

Unsubscribing service

Should you ever need to unsubscribe your service from the operator programmatically, createConsumer returns a promise with an unsubscribe function.

const unsubscribe = await createConsumer(...)

// later
unsubscribe()

To resubscribe, you have to call createConsumer again

Processing messages

The message handler callback provides one argument: the message context. The context provides helper methods to access previous operator results and also methods to abort the processing early.

context api data retrieval

ctx.getInput()

The input that was defined when the task started

ctx.getMeta()

The additional meta information added to the task

ctx.getConfiguration(): unknown

The optional configuration of the current service

ctx.getServiceMeta(serviceIdentifier): {
  executionDate: Date | null, // null when the service was not yet executed
  id: string, // service id used for internal routing
  alias?: string, // service alias (see service Identifier)
  name: string,
  configuration?: unknown,
  version: string
}

The meta parameters of a service, usually not needed

ctx.getServiceData(serviceIdentifier): unknown

The response of a previously executed service

ctx.getLastServiceData(serviceIdentifier): unknown

Same as ctx.getServiceData but only returns the last data when a service is used multiple times

ctx.getServiceError(serviceIdentifier): string | null

The error message of a previously executed service or null when said service was not yet executed or did process without error.

ctx.getLastServiceError(serviceIdentifier): string | null

Same as ctx.getServiceError but only returns the last error when a service is used multiple times

ctx.getResults(): Array<{
  executionDate: Date | null, // null when the service was not yet executed
  id: string, // service id used for internal routing
  alias?: string, // service alias (see service Identifier)
  name: string,
  configuration?: unknown,
  version: string
  error?: string
  data?: unknown | null
}>

get results of all configured services in this task. When a service did run prior to the current service, executionDate and error or data will be available.

ctx.getTenantName(): String

The name of the tenant that initiated the task

context api processing control

ctx.retry(delay?: number): never

Instructs the operator to retry the service after an optional delay in milliseconds (minimum is 100ms). No code gets executed after this line. Currently, it is your responsibility to prevent infinite loops.

ctx.abort(reason?: string): never

Instructs the operator to abort the processing of this service immediately. If the service is not configured with a fail over, the complete task will fail.

service identifier

Most context methods require a service identifier as parameter. This identifier can either be a service name or a service alias. The latter is useful for situations where a service is used multiple times in a task and needs the data from one specific execution.

Full example

Pseudo implementation of a database service

import createConsumer from '@requence/consumer'
import db from './database.js' // pseudo code
createConsumer(
  {
    url: 'amqp://username:password@host',
    version: '1.2.3',
    prefetch: 2, // we can process 2 messages in parallel
  },
  async (ctx) => {
    const ocrData = ctx.getServiceData('ocr')

    if (!ocrData) {
      ctx.abort('Ocr service is mandatory for this AI service')
    }

    if (!db.isConnected) {
      // lets wait 2s for the db to recover
      ctx.retry(2000)
    }

    const response = await db.getDataBasedOnOcr(ocrData)

    return response
  },
)