@soundxyz/redis-pubsub
v4.1.2
Published
Full type-safe Redis PubSub with Zod
Downloads
1,469
Readme
redis-pubsub
Full type-safe Redis PubSub system with async iterators
Features
- [x] Type-safety with Zod
- [x] Out-of-the-box support for
Date
/Map
/Set
/BigInt
serialization with superjson - [x] Full usage of Async Iterators
- [x] Support for AbortController / AbortSignal
- [x] Support for type-safe filtering using Type Guards / Type predicates
- [x] Support for Entity + Identifier pattern subscriptions
- [x] Support for Async Zod Refines and Async Zod Transforms
- [x] GraphQL API ready
Install
pnpm add @soundxyz/redis-pubsub
npm install @soundxyz/redis-pubsub
yarn add @soundxyz/redis-pubsub
Peer dependencies
pnpm add zod ioredis
npm install zod ioredis
yarn add zod ioredis
Usage
Create a Redis PubSub instance:
import Redis from "ioredis";
import { z } from "zod";
import { RedisPubSub } from "@soundxyz/redis-pubsub";
const { createChannel } = RedisPubSub({
publisher: new Redis({
port: 6379,
}),
subscriber: new Redis({
port: 6379,
}),
});
Create a channel with any Zod
schema and a unique "name"
to be used as main trigger.
const schema = z.object({
id: z.string(),
name: z.string(),
});
const userChannel = createChannel({
name: "User",
schema,
});
const nonLazyUserChannel = createChannel({
name: "User",
schema,
// By default the channels are lazily connected with redis
isLazy: false,
});
Subscribe and publish to the channel
// Using async iterators / async generators to subscribe
(async () => {
for await (const user of userChannel.subscribe()) {
console.log("User", {
id: user.id,
name: user.name,
});
}
})();
// You can explicitly wait until the channel is sucessfully connected with Redis
await userChannel.isReady();
// Publish data into the channel
await userChannel.publish(
{
value: {
id: "1",
name: "John",
},
},
// You can also publish more than a single value
{
value: {
id: "2",
name: "Peter",
},
}
);
Filter based on the data
(async () => {
for await (const user of userChannel.subscribe({
filter(value) {
return value.id === "1";
},
})) {
console.log("User 1", {
id: user.id,
name: user.name,
});
}
})();
// You can also use type predicates / type guards
(async () => {
for await (const user of userChannel.subscribe({
filter(value): value is { id: "1"; name: string } {
return value.id === "1";
},
})) {
// typeof user.id == "1"
console.log("User 1", {
id: user.id,
name: user.name,
});
}
})();
Use custom identifiers
It will create a separate redis channel for every identifier, concatenating "name"
and "identifier"
, for example, with "name"
="User"
and "identifier"
= 1
, the channel trigger name will be "User1"
(async () => {
for await (const user of userChannel.subscribe({
// number or string
identifier: 1,
})) {
console.log("User with identifier=1", {
id: user.id,
name: user.name,
});
}
})();
await userChannel.isReady({
// number or string
identifier: 1,
});
await userChannel.publish({
value: {
id: "1",
name: "John",
},
identifier: 1,
});
Separate input from output
You can levarage Zod Transforms to be able to separate input types from the output types, and receive any custom class or output on your subscriptions.
class CustomClass {
constructor(public name: string) {}
}
const inputSchema = z.string();
const outputSchema = z.string().transform((input) => new CustomClass(input));
const channel = pubSub.createChannel({
name: "separate-type",
inputSchema,
outputSchema,
});
const subscription = (async () => {
for await (const data of channel.subscribe()) {
return data;
}
})();
await channel.isReady();
await channel.publish({
value: "test",
});
const result = await subscription;
// true
console.log(result instanceof CustomClass);
// true
console.log(result.name === "test");
Use AbortController / AbortSignal
If isLazy
is not disabled, the last subscription to a channel will be automatically unsubscribed from Redis.
const abortController = new AbortController();
const abortedSubscription = (() => {
for await (const data of userChannel.subscribe({
abortSignal: abortController.signal,
})) {
console.log({ data });
}
})();
// ...
firstSubscribeAbortController.abort();
await abortedSubscription;
Unsubscribe specific identifiers
await userChannel.unsubscribe(
{
identifier: 1,
},
// You can specify more than a single identifer at once
{
identifier: 2,
}
);
Unsubscribe an entire channel
await userChannel.unsubscribeAll();
Close the PubSub instance
const pubSub = RedisPubSub({
publisher: new Redis({
port: 6379,
}),
subscriber: new Redis({
port: 6379,
}),
});
// ...
await pubSub.close();