npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@alexy4744/nestjs-nats-jetstream-transporter

v1.0.2

Published

NATS JetStream transporter for NestJS

Downloads

57

Readme

NestJS NATS JetStream Transporter

A NATS transporter for NestJS that takes advantage of JetStream for event patterns.

Installation

$ npm install @alexy4744/nestjs-nats-jetstream-transporter [email protected]

⚠️ This library uses NATS.js, which means that the underlying JetStream API is subject to change and does not follow semver. It is recommended to use [email protected] until the JetStream API is out of beta as it has been fully tested with this package.

Publishing Messages

The NatsClient works mostly the same with the built-in NATS transporter for NestJS. The only difference is that you must instantiate NatsClient yourself.

Request-Reply

// main.ts
import { NatsTransportStrategy } from "nestjs-nats-jetstream-transporter";

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy()
});

await app.listenAsync();
// math.controller.ts
import { NatsClient } from "nestjs-nats-jetstream-transporter";

@Controller()
export class MathController {
  private readonly client = new NatsClient();

  accumulate(): Observable<number> {
    return this.client.send<number>("sum", [1, 2, 3]);
  }
}

Event-Based

// main.ts
import { NatsTransportStrategy } from "nestjs-nats-jetstream-transporter";

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy({
    // Create a stream with a subject called "orders.created"
    // This is important later on when we publish an event with NatsClient
    streams: [
      {
        name: "orders-events",
        subjects: ["orders.created"]
      }
    ]
  })
});

await app.listenAsync();
// orders.controller.ts
import { NatsClient } from "nestjs-nats-jetstream-transporter";

@Controller()
export class OrdersController {
  private readonly client = new NatsClient();

  constructor(private readonly ordersService: OrdersService) {}

  async create(): Promise<void> {
    const order = await this.ordersService.create();

    this.client.emit("orders.created", order);
  }
}

Receiving Messages

There are no special changes needed to receive messages. Just use the @EventPattern() and @MessagePattern() decorators provided by NestJS.

@Ctx() works exactly the same, however you should use the NatsContext provided by this package as the parameter type. It exposes an additional getMessage() method that returns the original message object if needed.

Customizing JetStream consumer options

You can customize how the JetStream push consumer behaves. One example is making the consumer durable to survive application restarts. Another example is taking advantage of distributed queues for horozontal scaling.

Read more about JetStream consumers here, and refer to the underlying API here.

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy({
    consumer: (options) => {
      options.durable("my-durable-name");

      // When using a queue group with JetStream, it is necessary that deliverTo() and queue() uses the same name.
      // This is a requirement for NATS.js v2.2.0, see this issue for more details:
      // https://github.com/nats-io/nats.js/issues/446
      options.deliverTo("my-queue-group");
      options.queue("my-queue-group");
    }
  })
});

NACK and TERM JetStream messages

By default, all JetStream messages are automatically acknowledged. However, you can also NACK and TERM the message by returning the respective symbols from your application.

Returning NACK will ask for the message to be redelivered, while TERM will terminate future deliveries of the message.

// shipping.controller.ts
import { NACK, TERM } from "nestjs-nats-jetstream-transporter";

@Controller()
export class ShippingController {
  constructor(private readonly shippingService: ShippingService) {}

  @EventPattern("orders.created")
  handleCreatedOrder(order) {
    // If a shipment cannot be scheduled at this time, then ask for the message to be redelivered
    if (this.shippingService.isBusy()) {
      return NACK;
    }

    // If a shipment already exists for this order, then terminate the redelivery of this message
    if (this.shippingService.exists(order)) {
      return TERM;
    }

    this.shippingService.scheduleShipment(order);
    
    // Otherwise, the message will be auto-acked
  }
}

If the handler for your event pattern throws an error, the message will automatically be terminated. You can change this behavior by providing an onError function to the transport strategy options.

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy({
    // Messages that caused an exception will be acked instead
    onError: (message) => message.ack()
  })
});

Queue Groups

You can specify a queue group name to enable distributed queues. This will load balance message delivery across all other application instances with the same queue group name. It makes horozontal scaling simple as you can scale up by running another instance of your application with no additional configuration.

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy({
    queue: "my-queue-group"
  })
});

If you want to enable this functionality for event patterns, you must also specify the queue group name using the JetStream consumer options builder.

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new NatsTransportStrategy({
    consumer: (options) => {
      // When using a queue group with JetStream, it is necessary that deliverTo() and queue() uses the same name.
      // This is a requirement for NATS.js v2.2.0, see this issue for more details:
      // https://github.com/nats-io/nats.js/issues/446
      options.deliverTo("my-queue-group");
      options.queue("my-queue-group")
    },
    queue: "my-queue-group"
  })
});

Development

# Run tests
$ nx test nestjs-nats-jetstream-transporter
# Update version
$ nx version nestjs-nats-jetstream-transporter
# Build the project
$ nx build nestjs-nats-jetstream-transporter
# Publish new version on GitHub
$ git push --follow-tags origin master
# Publish new version on NPM
$ npm publish ./dist/packages/nestjs-nats-jetstream-transporter --access=public