npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

async-streamify

v1.2.0

Published

Stream and serialize nested promises and async iterables over HTTP, workers, etc

Downloads

680

Readme

async-streamify

Stream and serialize nested promises and async iterables over HTTP, workers, etc

NPM Version JSR JSR Score GitHub Workflow Status (with event) Coverage semantic-release TypeScript MIT License

async-streamify enables seamless transmission of complex async objects (including promises and async iterators) over HTTP and other text-based protocols, while maintaining their async behavior on the receiving end.

Features

  • ✨ Stream promises and async iterators as they resolve/yield
  • 🚀 Receive native promises and async iterables on the client
  • 🔄 Support for deeply nested async objects (iterable in a promise, etc)
  • 🎯 Type-safe serialization and deserialization
  • 🌊 Automatic backpressure handling
  • 📦 Zero dependencies
  • 🛡️ Works in all modern runtimes (browser, bun, deno, node, edge).

Installation

  • From npm with bun add | deno add | npm install | pnpm add | yarn add and async-streamify.
  • From jsr with deno | npx jsr | yarn dlx jsr | pnpm dlx jsr | bunx jsr and add gadicc:@async-streamify.

Quick Start

server.ts

import { AsyncResponse } from "async-streamify";

// Helper functions; integers() generates a new integer every 200ms.
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); // deno-fmt-ignore
async function* integers(max=10) { let i=0; while (i <= max) { yield i++; await sleep(200); }}

// Create an object with mixed async values
const data = () => ({
  availability: "immediate",
  promise: sleep(100).then(() => "completed"),
  stream: integers(3),
  nested: {
    iteratorInPromise: sleep(100).then(() => integers(3)),
  },
});
export type data = typeof data;

// HTTP handler
export function handler(request: Request) {
  return new AsyncResponse(data(), {
    headers: {
      "Access-Control-Allow-Origin": "*",
      "Cache-Control": "no-cache", // Recommended for streaming responses
    },
  });
}

client.ts

import { deserializeResponse } from "async-streamify";
import type { data } from "./server";

const response = await fetch("http://localhost:8000");
const result = await deserializeResponse<ReturnType<data>>(response);

// Values are received as they become available
console.log(result.availability); // "immediate"

result.promise.then((value) => {
  console.log(value); // "completed" (after 100ms + network latency)
});

// Async iterators retain their native behaviour
for await (const num of result.stream) {
  console.log(num); // 0, 1, 2, 3 (streamed every 200ms)
}

// Nested values work and stream as they resolve
for await (const num of (await result.nested.iteratorInPromise)) {
  console.log(num); // 0, 1, 2, 3
}

API Reference

AsyncResponse

Creates a streaming response that serializes an object containing promises and async iterables. The response is streamed as newline-delimited JSON (NDJSON).

new AsyncResponse(data: any, init?: ResponseInit)

Parameters:

  • data: The object to serialize and stream. Can contain promises, async iterables, and nested objects.
  • init: Standard ResponseInit options. The response will automatically set:
    • Content-Type: application/x-ndjson
    • Additional headers can be provided through init.headers

deserializeResponse

Deserializes a streaming response back into an object with promises and async iterables.

deserializeResponse<T>(response: Response): Promise<T>

Low-level API

For more control, you can use the serializer/deserializer directly:

import { AsyncObjectSerializer, deserialize } from "async-streamify";

const serializer = new AsyncObjectSerializer(data);
for await (const chunk of serializer) {
  // Send chunks over your transport
  // Each chunk is a JSON object that should be serialized
}

const deserializedData = await deserialize<typeof data>(receivedStream);

For a full implementation example (for Response types with NDSON), see send/response.ts and receive/response.ts.

How It Works

  1. The server serializes objects into a stream objects that contains either resolved values or references to pending async operations.
  2. Values are transmitted as soon as they become available (provided the stream is ready for more, i.e., backpressure handling).
  3. The AsyncResponse and deserializeResponse helpers further serialize via NDJSON (Newline Deliminited JSON) for HTTP streaming.
  4. The client "reassembles" the stream back into native objects, promises, async iterables.

Protocol Details

The serialized stream consists of the original root object on the first line, with any async instances being substituted with a unique index and being resolved on future lines with [idx, value], i.e.:

const object = { promise: sleep(100).then("resolved"; integers: integers(2) };
console.log(await Array.fromAsync(new AsyncObjectSerializer(object)));
[
  { promise: { $promise: 1 }, integers: { $asyncIterable: 2 } },
  [ 1, { $resolve: "resolved" } ],
  [ 2, { value: 0, done: false }],
  [ 2, { value: 1, done: false }],
  [ 2, { value: 2, done: false }],
  [ 2, { value: undefined, done: true }],
]

Limitations

  • Async generators on the client yield as fast as the stream can handle, not when explicitly requested on the client.
  • Because of how promise chains work, if you provide a promise as the only item to serialize, if you call await reassemble(...) you'll get the result or thrown error back, not a promise. To work around, simply nest it, e.g. { promise: new Promise() }.
  • Errors in promise rejections are (de-serialized), and error instanceof Error works. But obviously instances of custom errors cannot be sent over the wire, so instead check if error.name === "CustomError" vs ~~error instanceof CustomError~~, etc.
  • Errors in async generators are not handled yet (TODO)
  • Circular references are not supported
  • WebSocket and bi-directional streaming are not currently supported
  • The transport must support streaming and handle backpressure correctly

TypeScript Support

The library is written in TypeScript and provides full type safety. Use the generic parameter in deserializeResponse<T> to ensure type-safe deserialization:

interface MyData {
  promise: Promise<string>;
  stream: AsyncIterable<number>;
}

const data = await deserializeResponse<MyData>(response);
// data is fully typed

License

Copyright (c) 2024 by Gadi Cohen. MIT Licensed.