npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

exthos

v0.2.3

Published

stream processing in nodejs using the power of golang

Downloads

4

Readme

exthos

logo

stream processing in NodeJS using the power of Golang

Project status

exthos is incubating, i.e. while the feature list implementation is a WIP, the project can be used in production with existing features

|API|Status|Comments| |--|--|--| |Low level APIs|Stable|Not all components are implemented yet| |High level APIs|Early days|The API is not stable and far for being complete. Use with caution|

Website

exthos

Click to Jump staight to installation and usage

Table of Contents


Theory

Motivation

exthos brings together javascript for its ubiquity and ease of use and golang for its high performance nature to solve stream processing problem.

javascript is the most common programming language (source: stackoverflow). The popularity seems to stem from the fact that is it both light weight and easy to integrate with other frameworks/languages. However, it is just not made for heavy compute, and stream processing often requires that. On the other hand, golang's speed, simplicity and memory efficiency pegs it as a great candidate for compute heavy workloads.

exthos attempts to bring the two together to provide a streaming processing engine with intuitive APIs - heavily inspired by the Apache Camel project.

javascript with typescript provides a solid foundation for building a Domain Specific Language (DSL). exthos attempts to bring streaming/integration DSL to work with otherwise a configuration based stream engine called benthos.

How exthos works

exthos has two modes of operations:

  • local isLocal=true: In this mode, the engine takes on the responsibility of managning the engineProcess itself
  • remote isLocal=false: In this mode, the engine communicates with a remote engineProcess over HTTP

The default mode is isLocal=true

The remote mode can be used for deploying into remote benthos processes, and in the future will also be used to define the exthos deployment framework

How to use exthos

exthos has two set of APIs:

  • the low level APIs and
  • the high level APIs

Users are encouraged to use the HL APIs, although an overview of the LL APIs helps to create a good foundation.

Overview: Low level APIs

At the core of exthos is the engine that is responsible for running the engineProcess. The engineProcess is esentially a child process that runs benthos (in streams mode). streams represent inputs connected to outputs, optionally via a pipeline containing processors and are created using: new Stream(...). Stream objects are added to the engine instance at will using: engine.add(stream). The streams can be updated or removed from the engine at will as well using: engine.update(), engine.remove().

Overview: High level APIs

The high level APIs abstract a lot of complexity away from the user and enabling the users to focus on the integration/streaming problem. As mentioned before, HLAPIs are heavity inspired by Apache Camel's DSL, however it is important to note that there is no overlap or commonality between exthos and Camel.

While using HLAPIs, the users create what we call a route. Internally, a route is akin to a LLAPI stream. A simple route would consist of an input and an output specified in a fluent API manner using from and to constructs. eg: let route01 = from(...).to(...)

Once a route is created, one can start it. Internally, exthos starts the engine if not running and adds the route to the engine. e.g. route01.start()

The route can subsequently be stopped using: route01.stop()

Components

Inputs

exthos supports a number of inputs as sources of data. Take a look here.

Some specials inputs are:

  • inport: enables sending data from JS-land to goland-land
  • direct: acts as as alias and can be used to receive data from another direct output

Outputs

exthos supports a number of outputs as destinations of data. Take a look here.

Some specials outputs are:

  • outport: enables receiving data from goland-land to JS-land
  • direct: acts as as alias and can be used to send data to another direct input

Processors

exthos supports a number of processors. Procesors are used to act on a stream of data to perform operations such as lookup, filter, enrichment etc. Take a look here.

Some specials processors are:

  • javascript: allows you to write javascript to manipulate content and meta of the messages

Messages

messages are the events that are produced by sources, processed by processors and consumed by destinations. messages are always part of a batch. If unspcified, a batch contains a single message only. However, you can create batches of multiple messages and operate on them. Each message contains three parts:

  • meta: contains the metadata for the message as key value pairs
  • content: contains the actual data you may be intereted it. This data can be anything e.g. raw bytes, structured JSON, text etc.
  • error: contains description of the error associated with a message. This is important to note, because messages are not dropped on error. i.e. if a processor fails on a message, the error property is tagged with the error description and the message is moved to the next processor.

In practice

Installation

npm i exthos

Usage

Now that you hopefully have a fair idea of what Benthos is and is not, it's time to get your hands dirty.

A good starting point is to read the notes below and then head to the examples folders.

Using high level APIs: from/via/to

The following examples illustrate the basic usage. Most examples below make use of bloblang; please refer to link for details on how to use it.

// import the required constructs
import { from, to } from "exthos"

// example: create a route, start it and stop it after 5 seconds
from({ generate: { mapping: 'root = count("gen")' } }).to({ stdout: {} }).start().stopAfter(5000)

// example: create a route with a terminating source and start it.
// generate input with count = 2 will geenrate 2 msgs and 
from({ generate: { mapping: 'root = count("gen")', count: 2 } }).to({ stdout: {} }).start()
// no need to specify stop explicitly here as the route will stop after 2 messages are processed

For more examples refer to: examples dir

Other/non-functional features

Configuration

Like any application, exthos relies of some smart defaults but at the same time empowers the users to provide their own configuration via a config file or environment variables.

The order or precedence is as follows:

  1. default configuration as defined under config/config.default.ts
  2. overwritten by any matching keys in exthos.config.json placed in the CWD (Current Working Directory: usually where you project root is)
  3. overwritten by any matching keys in environment variables prefixed with EXTHOS_ e.g.
    1. EXTHOS_engineExtraConfig_benthosDir=/tmp/test
    2. EXTHOS_engineConfig_logger_level=TRACE
  4. overwritten by parameters sent to constructor via new Engine({<engineConfig>}, {<extraEngineConfig>})

e.g. to use your pre-installed version of benthos create a file named exthos.config.json on CWD with the following content:

{
    "engineExtraConfig": {
        "benthosDir": "",
        "benthosFileName": "benthos"
    }
}

Labels

this section covers llapis only

Labels can be optionally assigned to all components. They aid in debugging and tracing. The labels will be sanitised by the stream instace to follow the following rules:

  • should match the regular expression /^[a-z0-9_]+$/
  • must not start with an underscore

Error handling

this section covers hlapis only

exthos makes best attempt at swallowing any errors generated

There are two source of errors we must handle:

  1. errors in JS land
    1. For JS land, errors can be handled using try(){}catch(){}finally(){} or <Promise>.then().catch().finally()
    2. Checkout the examples here
  2. errors in Golang land
    1. For Golang land, stream and message related errors must be handled using the component configurations, which are then handed over to benthos. As a rule of thumb, when an error occurs on a message, the error property of the message is marked with the error description and the message moves to the next component of the stream. To understand error handling please go through this link

Logging

exthos makes use of the debug package to log output to stdout.

You can configure the debug namespace using one of the following:

  1. DEBUG=...* environment variable
  2. When initializing the engine
    1. llapi: new Engine({}, {debugNamespace: "...*"})
    2. hlapi: engine.setEngineConfigOptions({}, {debugNamespace: "...*"})

The default namespace is nil i.e. = ""

The following namespaces are available within exthos:

|namespace|description| |---|---| | exthos* | all exthos logs | | exthos:engine:* | engine logs of all log types; engine logs originate in the Javascript/Node land | | exthos:engineProcess:* | engineProcess logs of all log levels; engineProcess logs originate from the golang runtime | | exthos:engine:debugLog | engine logs of debugLog type, providing info debug and info level information | | exthos:engine:traceLog | engine logs of traceLog type, providing very detailed trace level information such as flow of code execution | | exthos:eventLog* | all eventLog type, providing events generated as a log; each logline is a JSON containing the eventObj | | exthos:eventLog:<eventName> | eventLog type with an <eventName> eventName, refer to Events for complete list of <eventName>s| | exthos:engineProcess:trace | engineProcess logs of trace level | | exthos:engineProcess:debug | engineProcess logs of debug level | | exthos:engineProcess:info | engineProcess logs of info level | | exthos:engineProcess:warn | engineProcess logs of warn level | | exthos:engineProcess:error | engineProcess logs of error level | | exthos:engineProcess:fatal | engineProcess logs of fatal level |

Events

exthos emits a lot of events. These can be used to trigger custom actions (aka handlers) in JS-land e.g. sending an alert notification on an error type category, or stopping the engine on a fatal type category, etc.

  • You could provide your own event handling functions (actions/handlers) using any of the following. For even more options on how to handle events refer to EventEmitter2 documentation
    • engine.on("<eventName>", (eventObj) => {...})
    • engine.onAny((eventName, eventObj) => {...}) - listens to all events geenrated by the engine and allows you to handle them at one place
  • Or, you could use the builtin default event handler using: engine.useDefaultEventHandler(addnCustomHandler)
    • the default event handler prints all events on the console/stdout,
    • stops the stream on receiving an "engineProcess.stream.error" event 5 times
    • you could add custom eventHandler on top of already available using addnCustomHandler e.g.
engine.useDefaultEventHandler({
  "engine.fatal": (eventObj) => {
    console.log("\nTest EXITED with CODE=1", JSON.stringify(eventObj));
    process.exit(1);
  },
});

As mention under Logging, all events are also logged under exthos:eventLog namespace

All possible events are shown in the table below:

| Event name (eventName)| Event category | Event Object properties (eventObj) | Description | | -- | -- | -- | -- | | engine.active | general | {msg: string, time: string} | signifies that the engine is running | | engine.inactive | general | {msg: string, time: string} | signifies that the engine is NOT running | | engine.warn | warn | {msg: string, time: string, error: Error} | signifies a potential abnormality that was ignored/recovered | | engine.error | error | {msg: string, time: string, error: Error} | signifies a failure to an operation, not the entire engine | | engine.fatal | fatal | {msg: string, time: string, error: Error} | signifies a failure to the entire engine, leading to shutdown of the engine | | engine.stream.add | general | {msg: string, time: string, stream: Stream} | signifies that a stream has been added to the engine | | engine.stream.update | general | {msg: string, time: string, stream: Stream} | signifies that a stream has been updated to the engine | | engine.stream.remove | general | {msg: string, time: string, stream: Stream} | signifies that a stream has been removed from the engine | | engine.stream.error | error | {msg: string, time: string, stream: Stream, error: Error} | signifies an error when the engine tried to add/update/remove the stream from the engine | | engineProcess.stream.fatal | fatal | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this | | engineProcess.stream.error | error | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this | | engineProcess.stream.warn | error | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this | | engineProcess.stream.info | error | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this | | engineProcess.stream.debug | error | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this | | engineProcess.stream.trace | error | {msg: string, time: string, stream: Stream, error: Error} | jury is out on when engineProcess emits this |

Each event emits the eventObj that takes the form of { stream?: Stream, msg: string } i.e. it always contains the msg string value and may contain the stream object if the event was generated in the context of a stream.

Metrics

Coming soon.

Tracing and Telemetry

Coming soon.

Compatability & Support

  • only linux based OS will work
  • exthos to benthos compatability mapping follows:

|exthos|benthos| |--|--| |v0.0.1 - v0.2.1 | v4.5.1 |

Acknowledgement

While we thank all the projects that exthos depends on, we would like to show special gratitude towards the following projects: