npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@codecompose/typed-pubsub

v1.2.0

Published

A type-safe pub/sub abstraction for Google Cloud and Firebase

Readme

typed-pubsub

A type-safe PubSub abstraction for Google Cloud and Firebase.

Features

  • Type-safe messaging: Full TypeScript support for publishers and handlers
  • Runtime validation: Schema validation using Zod prevents invalid messages
  • Duplicate prevention: Optional event marking to prevent duplicate processing
  • Stale message handling: Optional time-based dropping of messages
  • Customizable defaults: Configure your own library-wide defaults with per-handler overrides. Out-of-the box you get:
    • Retries enabled with 7-day exponential backoff
    • Scale to maximum of 250 instances per topic (safer than Firebase's default of 3000)
    • 512MB memory allocation per instance

Installation

npm install @codecompose/typed-pubsub

Peer Dependencies

This package has the following peer dependencies:

  • @google-cloud/pubsub
  • firebase-functions
  • zod

Quick Start

  1. Define your message schemas with Zod
  2. Create a PubSub client
  3. Initialize the typed PubSub with your schemas
  4. Create type-safe publishers and handlers
import { PubSub } from "@google-cloud/pubsub";
import { createTypedPubsub } from "@codecompose/typed-pubsub";
import { z } from "zod";

// 1. Define your schemas
const schemas = {
  user_created: z.object({
    userId: z.string(),
    email: z.string().email(),
  }),
};

// 2. Create PubSub client
const client = new PubSub();

// 3. Initialize typed PubSub
const pubsub = createTypedPubsub({
  client,
  schemas,
  region: "us-central1",
});

// 4a. Create and use a publisher
await pubsub.createPublisher("user_created")({
  userId: "123",
  email: "[email protected]",
});

// 4b. Create a handler
export const handle_user_created = pubsub.createHandler({
  topic: "user_created",
  handler: async (data) => {
    // data is fully typed based on the schema
    console.log(`New user created: ${data.email}`);
  },
});

Usage Examples

Basic Setup

import { PubSub } from "@google-cloud/pubsub";
import { createTypedPubsub } from "@codecompose/typed-pubsub";
import { z } from "zod";

// Define your schemas
const schemas = {
  user_created: z.object({
    userId: z.string(),
    email: z.string().email(),
    createdAt: z.string().datetime(),
  }),
  order_placed: z.object({
    orderId: z.string(),
    userId: z.string(),
    items: z.array(
      z.object({
        productId: z.string(),
        quantity: z.number().int().positive(),
      }),
    ),
    total: z.number().positive(),
  }),
};

// Create typed Pubsub client
const pubsub = createTypedPubsub({
  client: new PubSub(),
  schemas,
  region: "us-central1",
});

// Publish a message
await pubsub.createPublisher("user_created")({
  userId: "123",
  email: "[email protected]",
  createdAt: new Date().toISOString(),
});

// Create a handler
export const handle_order_placed = pubsub.createHandler({
  topic: "order_placed",
  handler: async (data) => {
    // data is fully typed based on the schema
    console.log(`Processing order ${data.orderId} for user ${data.userId}`);
    // Process the order...
  },
  options: {
    memory: "1GiB",
    timeoutSeconds: 60,
    markEvent: true, // Never process this event more than once
  },
});

Note: The examples use snake_case for topic names (like "user_created") and exported cloud functions (like handle_order_placed). This is because casing is currently ignored in Cloud Run function names in GCP, so using snake_case is preferred for consistency.

Preventing Duplicate Processing with Event Marking

PubSub has at-least-once delivery semantics, meaning messages might occasionally be delivered more than once. For operations that must be executed exactly once (like payment processing or user credit awards), this library provides event marking.

The markEvent option uses your provided storage (like Redis) to track which event IDs have already been processed, ensuring each event is handled exactly once despite potential redeliveries.

import { PubSub } from "@google-cloud/pubsub";
import { createTypedPubsub } from "@codecompose/typed-pubsub";
import { redis } from "./redis-client";

// Create event marking functions
const eventMarkingFunctions = {
  isEventProcessed: async (eventId) => {
    return Boolean(await redis.get(`event:${eventId}`));
  },
  markEventAsProcessed: async (eventId) => {
    await redis.set(`event:${eventId}`, "1", "EX", 86400); // 24 hours TTL
  },
};

// Create typed Pubsub client with event marking
const pubsub = createTypedPubsub({
  client: new PubSub(),
  schemas,
  region: "us-central1",
  options: {
    eventMarkingFunctions,
    defaultHandlerOptions: {
      markEvent: true, // Enable for all handlers by default
      vpcConnector: "redis-connector", // If Redis is in a VPC
    },
  },
});

Handling Time-Sensitive Events

For time-sensitive operations, you may want to discard messages that are too old. The retryMaxAgeMinutes option automatically drops events that exceed the specified age, preventing the processing of stale data.

// Configure time-based message dropping in the handler
export const handle_time_sensitive_events = pubsub.createHandler({
  topic: "time_sensitive_events",
  handler: async (data) => {
    // Process time-sensitive event
  },
  options: {
    retryMaxAgeMinutes: 60, // Only process events less than 1 hour old
  },
});

API Reference

createTypedPubsub(options)

Creates a type-safe Pubsub client for handling messages with schema validation.

Parameters

  • client: Google Cloud Pubsub client instance
  • schemas: Record of Zod schemas for each topic
  • region: GCP region for the Pubsub functions
  • options: (Optional) Configuration options
    • eventMarkingFunctions: (Optional) Functions for tracking processed events
    • defaultHandlerOptions: (Optional) Default options for all handlers
  • onMessagePublished: (Optional) Firebase message published handler, defaults to firebase implementation

Key Handler Options

| Option | Description | Default | | -------------------- | --------------------------------- | ---------------------- | | retry | Enable/disable automatic retries | true | | retryMaxAgeMinutes | Maximum event age before dropping | undefined (no limit) | | memory | Memory allocation | "512MiB" | | markEvent | Enable duplicate prevention | false | | timeoutSeconds | Function timeout in seconds | 20 | | maxInstances | Maximum concurrent instances | 250 | | vpcConnector | Name of the VPC connector | undefined |

Important note about retry behavior: While you can enable/disable retries and set maximum age for retried events, the actual retry timing is controlled by the PubSub subscription's ACK deadline, not by the timeoutSeconds setting. The timeout will shut down the server process, but the event will only be retried after the ACK deadline expires, which cannot be configured through this library.

Limitations and Alternatives

While this library provides excellent type safety and schema validation for PubSub messages, there are some limitations inherent to PubSub:

  • Limited control over retry timing (controlled by subscription ACK deadline)
  • No built-in support for manually triggered retries
  • No fine-grained error handling for different failure scenarios

If your use case requires more advanced features like controlled retry intervals, manual retry triggering, or long-running background processes, consider using a dedicated task queue system. The typed-tasks library provides similar type-safety guarantees with more control over task execution.