npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

zod-stream

v2.0.2

Published

A client for node or the browser to generate and consume streaming json

Downloads

54,549

Readme

zod-stream adds structured output validation and streaming capabilities to LLM responses. Built on top of schema-stream, it enables type-safe extraction with progressive validation.

Key Features

  • 🔄 Stream structured LLM outputs with validation
  • 🎯 Multiple response modes (TOOLS, FUNCTIONS, JSON, etc.)
  • 📝 OpenAI client integration
  • 🌳 Progressive validation with partial results
  • ⚡ Built on schema-stream
  • 🔍 Full TypeScript support

Why zod-stream?

zod-stream solves key challenges in handling streaming LLM responses:

  • Dependency Management: Process data as soon as dependencies are met, rather than waiting for complete responses

    if (isPathComplete(['user', 'preferences'], chunk)) {
      // Start personalizing immediately, don't wait for content
      initializeUserExperience(chunk.user.preferences);
    }
  • Type-Safe LLM Integration: Full TypeScript support for structured outputs from OpenAI and other providers

    const params = withResponseModel({
      response_model: { schema, name: "Extract" },
      mode: "TOOLS"  // or "FUNCTIONS", "JSON", etc.
    });
  • Progressive Processing: Built on schema-stream for immediate access to partial results

    for await (const chunk of stream) {
      // Safely access partial data with full type inference
      chunk._meta._completedPaths.forEach(path => {
        processDependency(path, chunk);
      });
    }
  • Provider Flexibility: Consistent interface across different LLM response formats

    // Works with various response modes
    const stream = OAIStream({ res: completion });  // OpenAI tools/functions
    const stream = JSONStream({ res: completion }); // Direct JSON

Think of it as a type-safe pipeline for handling streaming LLM data where you need to:

  • Start processing before the full response arrives
  • Ensure type safety throughout the stream
  • Handle complex data dependencies
  • Work with multiple LLM response formats

Installation

# npm
npm install zod-stream zod openai

# pnpm
pnpm add zod-stream zod openai

# bun
bun add zod-stream zod openai

Core Concepts

The ZodStream client provides real-time validation and metadata for streaming LLM responses:

import ZodStream from "zod-stream";
import { z } from "zod";

const client = new ZodStream({
  debug: true  // Enable debug logging
});

// Define your extraction schema
const schema = z.object({
  content: z.string(),
  metadata: z.object({
    confidence: z.number(),
    category: z.string()
  })
});

// Create streaming extraction
const stream = await client.create({
  completionPromise: async () => {
    const response = await fetch("/api/extract", {
      method: "POST",
      body: JSON.stringify({ prompt: "..." })
    });
    return response.body;
  },
  response_model: {
    schema,
    name: "ContentExtraction"
  }
});

// Process with validation metadata
for await (const chunk of stream) {
  console.log({
    data: chunk,              // Partial extraction result
    isValid: chunk._meta._isValid,    // Current validation state
    activePath: chunk._meta._activePath,    // Currently processing path
    completedPaths: chunk._meta._completedPaths  // Completed paths
  });
}

Progressive Processing

zod-stream enables processing dependent data as soon as relevant paths complete, without waiting for the full response:

// Define schema for a complex analysis
const schema = z.object({
  user: z.object({
    id: z.string(),
    preferences: z.object({
      theme: z.string(),
      language: z.string()
    })
  }),
  content: z.object({
    title: z.string(),
    body: z.string(),
    metadata: z.object({
      keywords: z.array(z.string()),
      category: z.string()
    })
  }),
  recommendations: z.array(z.object({
    id: z.string(),
    score: z.number(),
    reason: z.string()
  }))
});

// Process data as it becomes available
for await (const chunk of stream) {
  // Start personalizing UI as soon as user preferences are ready
  if (isPathComplete(['user', 'preferences'], chunk)) {
    applyUserTheme(chunk.user.preferences.theme);
    setLanguage(chunk.user.preferences.language);
  }

  // Begin content indexing once we have title and keywords
  if (isPathComplete(['content', 'metadata', 'keywords'], chunk) && 
      isPathComplete(['content', 'title'], chunk)) {
    indexContent({
      title: chunk.content.title,
      keywords: chunk.content.metadata.keywords
    });
  }

  // Start fetching recommended content in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'recommendations' && path.length === 2) {
      const index = path[1] as number;
      const recommendation = chunk.recommendations[index];
      
      if (recommendation?.id) {
        prefetchContent(recommendation.id);
      }
    }
  });
}

This approach enables:

  • Early UI updates based on user preferences
  • Parallel processing of independent data
  • Optimistic loading of related content
  • Better perceived performance
  • Resource optimization

Stream Metadata

Every streamed chunk includes metadata about validation state:

type CompletionMeta = {
  _isValid: boolean;           // Schema validation status
  _activePath: (string | number)[];     // Current parsing path
  _completedPaths: (string | number)[][]; // All completed paths
}

// Example chunk
{
  content: "partial content...",
  metadata: {
    confidence: 0.95
  },
  _meta: {
    _isValid: false,  // Not valid yet
    _activePath: ["metadata", "category"],
    _completedPaths: [
      ["content"],
      ["metadata", "confidence"]
    ]
  }
}

Schema Stubs

Get typed stub objects for initialization:

const schema = z.object({
  users: z.array(z.object({
    name: z.string(),
    age: z.number()
  }))
});

const client = new ZodStream();
const stub = client.getSchemaStub({
  schema,
  defaultData: {
    users: [{ name: "loading...", age: 0 }]
  }
});

Debug Logging

Enable detailed logging for debugging:

const client = new ZodStream({ debug: true });

// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context

Using Response Models

The withResponseModel helper configures OpenAI parameters based on your schema and chosen mode:

import { withResponseModel } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  sentiment: z.string(),
  keywords: z.array(z.string()),
  confidence: z.number()
});

// Configure for OpenAI tools mode
const params = withResponseModel({
  response_model: {
    schema,
    name: "Analysis",
    description: "Extract sentiment and keywords"
  },
  mode: "TOOLS",
  params: {
    messages: [{ role: "user", content: "Analyze this text..." }],
    model: "gpt-4"
  }
});

const completion = await oai.chat.completions.create({
  ...params,
  stream: true
});

Response Modes

zod-stream supports multiple modes for structured LLM responses:

import { MODE } from "zod-stream";

const modes = {
  FUNCTIONS: "FUNCTIONS",   // OpenAI function calling
  TOOLS: "TOOLS",          // OpenAI tools API
  JSON: "JSON",            // Direct JSON response
  MD_JSON: "MD_JSON",      // JSON in markdown blocks
  JSON_SCHEMA: "JSON_SCHEMA" // JSON with schema validation
} as const;

Mode-Specific Behaviors

TOOLS Mode

// Results in OpenAI tool configuration
{
  tool_choice: {
    type: "function",
    function: { name: "Analysis" }
  },
  tools: [{
    type: "function",
    function: {
      name: "Analysis",
      description: "Extract sentiment and keywords",
      parameters: {/* Generated from schema */}
    }
  }]
}

FUNCTIONS Mode (Legacy)

// Results in OpenAI function configuration
{
  function_call: { name: "Analysis" },
  functions: [{
    name: "Analysis",
    description: "Extract sentiment and keywords",
    parameters: {/* Generated from schema */}
  }]
}

JSON Mode

// Results in direct JSON response configuration
{
  response_format: { type: "json_object" },
  messages: [
    {
      role: "system",
      content: "Return JSON matching schema..."
    },
    // ... user messages
  ]
}

Response Parsing

Built-in parsers handle different response formats:

import { 
  OAIResponseParser,
  OAIResponseToolArgsParser,
  OAIResponseFnArgsParser,
  OAIResponseJSONParser
} from "zod-stream";

// Automatic format detection
const result = OAIResponseParser(response);

// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);

Streaming Utilities

Handle streaming responses with built-in utilities:

import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";

// Create streaming response
app.post("/api/stream", async (req, res) => {
  const completion = await oai.chat.completions.create({
    ...params,
    stream: true
  });

  return new Response(
    OAIStream({ res: completion })
  );
});

// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
  console.log(chunk);
}

Path Tracking Utilities

Monitor completion status of specific paths:

import { isPathComplete } from "zod-stream";

const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
  _meta: {
    _completedPaths: [["analysis", "sentiment"]],
    _activePath: ["analysis", "keywords"],
    _isValid: false
  }
});

Error Handling

zod-stream provides error handling at multiple levels:

const stream = await client.create({
  completionPromise: async () => response.body,
  response_model: { schema }
});

let finalResult

// Path tracking for progressive updates
for await (const chunk of stream) {
  finalResult = chunk
  // Check which paths are complete
  console.log("Completed paths:", chunk._meta._completedPaths);
  console.log("Current path:", chunk._meta._activePath);
}

// Final validation happens after stream completes
const isValid = finalResult._meta._isValid

Real-World Use Cases

1. Progressive Data Analysis

const analysisSchema = z.object({
  marketData: z.object({
    trends: z.array(z.object({
      metric: z.string(),
      value: z.number()
    })),
    summary: z.string()
  }),
  competitors: z.array(z.object({
    name: z.string(),
    strengths: z.array(z.string()),
    weaknesses: z.array(z.string())
  })),
  recommendations: z.object({
    immediate: z.array(z.string()),
    longTerm: z.array(z.string()),
    budget: z.number()
  })
});

for await (const chunk of stream) {
  // Start visualizing market trends immediately
  if (isPathComplete(['marketData', 'trends'], chunk)) {
    initializeCharts(chunk.marketData.trends);
  }

  // Begin competitor analysis in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'competitors' && path.length === 2) {
      const competitor = chunk.competitors[path[1] as number];
      fetchCompetitorData(competitor.name);
    }
  });

  // Start budget planning once we have immediate recommendations
  if (isPathComplete(['recommendations', 'immediate'], chunk) && 
      isPathComplete(['recommendations', 'budget'], chunk)) {
    planBudgetAllocation({
      actions: chunk.recommendations.immediate,
      budget: chunk.recommendations.budget
    });
  }
}

2. Document Processing Pipeline

const documentSchema = z.object({
  metadata: z.object({
    title: z.string(),
    author: z.string(),
    topics: z.array(z.string())
  }),
  sections: z.array(z.object({
    heading: z.string(),
    content: z.string(),
    annotations: z.array(z.object({
      type: z.string(),
      text: z.string(),
      confidence: z.number()
    }))
  })),
  summary: z.object({
    abstract: z.string(),
    keyPoints: z.array(z.string()),
    readingTime: z.number()
  })
});

for await (const chunk of stream) {
  // Start document indexing as soon as metadata is available
  if (isPathComplete(['metadata'], chunk)) {
    indexDocument({
      title: chunk.metadata.title,
      topics: chunk.metadata.topics
    });
  }

  // Process sections as they complete
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
      const sectionIndex = path[1] as number;
      const section = chunk.sections[sectionIndex];
      
      // Process annotations for each completed section
      processAnnotations({
        heading: section.heading,
        annotations: section.annotations
      });
    }
  });

  // Generate preview once we have abstract and reading time
  if (isPathComplete(['summary', 'abstract'], chunk) && 
      isPathComplete(['summary', 'readingTime'], chunk)) {
    generatePreview({
      abstract: chunk.summary.abstract,
      readingTime: chunk.summary.readingTime
    });
  }
}

3. E-commerce Product Enrichment

const productSchema = z.object({
  basic: z.object({
    id: z.string(),
    name: z.string(),
    category: z.string()
  }),
  pricing: z.object({
    base: z.number(),
    discounts: z.array(z.object({
      type: z.string(),
      amount: z.number()
    })),
    final: z.number()
  }),
  inventory: z.object({
    status: z.string(),
    locations: z.array(z.object({
      id: z.string(),
      quantity: z.number()
    }))
  }),
  enrichment: z.object({
    seoDescription: z.string(),
    searchKeywords: z.array(z.string()),
    relatedProducts: z.array(z.string())
  })
});

for await (const chunk of stream) {
  // Start inventory checks as soon as basic info is available
  if (isPathComplete(['basic'], chunk)) {
    initializeProductCard(chunk.basic);
  }

  // Update pricing as soon as final price is calculated
  if (isPathComplete(['pricing', 'final'], chunk)) {
    updatePriceDisplay(chunk.pricing.final);
    
    // If we also have inventory, update buy button
    if (isPathComplete(['inventory', 'status'], chunk)) {
      updateBuyButton({
        price: chunk.pricing.final,
        status: chunk.inventory.status
      });
    }
  }

  // Start SEO optimization in parallel
  if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
      isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
    optimizeProductSEO({
      description: chunk.enrichment.seoDescription,
      keywords: chunk.enrichment.searchKeywords
    });
  }

  // Prefetch related products as they're identified
  if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {
    prefetchRelatedProducts(chunk.enrichment.relatedProducts);
  }
}

With Next.js API Routes

// pages/api/extract.ts
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string()),
  sentiment: z.object({
    score: z.number(),
    label: z.string()
  })
});

export default async function handler(req, res) {
  const { content } = await req.json();

  const params = withResponseModel({
    response_model: { 
      schema,
      name: "ContentAnalysis"
    },
    mode: "TOOLS",
    params: {
      messages: [{ 
        role: "user", 
        content: `Analyze: ${content}` 
      }],
      model: "gpt-4"
    }
  });

  const stream = await oai.chat.completions.create({
    ...params,
    stream: true
  });

  return new Response(OAIStream({ res: stream }));
}

With React and stream-hooks

import { useJsonStream } from "stream-hooks";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string())
});

function AnalysisComponent() {
  const [data, setData] = useState<z.infer<typeof schema>>();

  const { 
    loading, 
    error,
    startStream 
  } = useJsonStream({
    schema,
    onReceive: (data) => {
      setData(data)
    }
  });

  return (
    <div>
      <button 
        onClick={() => startStream({
          url: "/api/extract",
          method: "POST",
          body: { content: "..." }
        })}
        disabled={loading}
      >
        Start Analysis
      </button>

      {loading && <LoadingState paths={data._meta._completedPaths} />}
      {error && <ErrorDisplay error={error} />}
      
      <ProgressiveDisplay
        data={data}
        isComplete={data._meta._completedPaths.length > 0}
      />
    </div>
  );
}

Integration with Island AI

Part of the Island AI toolkit:

Contributing

We welcome contributions! Check out:

License

MIT © hack.dance