npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@chance-get-yours/nestjs-nats-jetstream-microservice

v0.1.0

Published

NATS JetStream Custom Transporter for NestJS

Downloads

13

Readme

NATS JetStream Custom Transporter for NestJS microservice

Goals of this library

Install

Library

npm i @nestjs/microservices
npm i nats
npm i @chance/nestjs-nats-jetstream-microservice

Server

NATS server could run locally

docker run -d --name nats -p 4222:4222 -p 6222:6222 -p 8222:8222 nats --jetstream -m 8222

or using Synadia NGS for quick setup.

Cli

NATS cli covers all needs from the command line

brew tap nats-io/nats-tools
brew install nats-io/nats-tools/nats

Or download official release.

Configuration

Streams

Streams are 'message stores', each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.

Subjects can be queried using NATS syntax

Configurations options could be set using the library

const bootstrap = async () => {
  const options: CustomStrategy = {
    strategy: new NatsJetStreamServer({
      connectionOptions: {
        servers: "127.0.0.1:4222",
        // Name the client for connection hostname to avoid overlap
        name: `nats-connection.${os.hostname()}`,
      },
      // Stream will be created if not exist
      // To work we need all this stream to be available
      assertStreams: [
        {
          name: "booking",
          description: "Booking domain with all its events",
          subjects: ["booking.>"],
        } as Partial<StreamConfig>
      ],
    }),
  };

  // hybrid microservice and web application
  const app = await NestFactory.create<NestFastifyApplication>(HotelBookingModule);
  const microService = app.connectMicroservice(options);
  await microService.listen();
  return app;
};
bootstrap();

Consumer options

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients. Unlike with core NATS which provides an at most once delivery guarantee of a message, a consumer can provide an at least once delivery guarantee.

Configuration options could be set using library and the decorator. Or be set using the cli and using the named consumer with the decorator

@Controller()
export class BotNatsController {
  constructor(private scheduleCleaning: ScheduleCleaningCommandHandler) {}

  // Consumer will be created if not exists
  // Updated if exists
  @EventPattern("ConsumerName", {
    description: "Trigger cleaning side effect when room is booked",
    filter_subject: "booking.*.room-booked-event.>",
    deliver_to: "cleanupInbox",
    durable: "cleanupStack",
    manual_ack: true,
  } as ConsumeOptions)
  async cleanup(
    @Payload() event: RoomBookedEvent,
    @Ctx() context: NatsJetStreamContext
  ) {
    // DO the work
    context.message.ack();
  }
}

Publishing events

@Module({
  imports: [
    NatsJetStreamTransport.register({
      connectionOptions: {
      servers: "127.0.0.1:4222",
      name: "hotel-booking-publisher",
    }})
  ],
  controllers: [BotNatsController,],
  providers: [],
})
export class HotelBookingModule {}
@Injectable()
export class BookRoomCommandHandler {
  constructor(private client: NatsJetStreamClient) {}
  async handle(command: BookRoomCommand) {
    // CloudEvent like syntax here, but nothing's mandatory
    const event = new RoomBookedEvent(
      { ...command.data, date: isoDate.toISOString() },
      source,
      correlationId
    );
    const uniqueBookingSlug = `booked-${correlationId}`;
    this.client
      .publish(
        'my.super.subject',
        event,
        // deduplication trick : booking slug is unique using message ID
        // dupe-window should be configured on stream, default 2mn
        { msgID: uniqueBookingSlug }
      )
      .then((res: PubAck) => {
        if (!res.duplicate) {
          return res;
        }
        throw new ConflictException('MsgID already exists error');
      });
  }
}