zod-stream
v2.0.2
Published
A client for node or the browser to generate and consume streaming json
Downloads
54,549
Maintainers
Readme
zod-stream
adds structured output validation and streaming capabilities to LLM responses. Built on top of schema-stream
, it enables type-safe extraction with progressive validation.
Key Features
- 🔄 Stream structured LLM outputs with validation
- 🎯 Multiple response modes (TOOLS, FUNCTIONS, JSON, etc.)
- 📝 OpenAI client integration
- 🌳 Progressive validation with partial results
- ⚡ Built on schema-stream
- 🔍 Full TypeScript support
Why zod-stream?
zod-stream
solves key challenges in handling streaming LLM responses:
Dependency Management: Process data as soon as dependencies are met, rather than waiting for complete responses
if (isPathComplete(['user', 'preferences'], chunk)) { // Start personalizing immediately, don't wait for content initializeUserExperience(chunk.user.preferences); }
Type-Safe LLM Integration: Full TypeScript support for structured outputs from OpenAI and other providers
const params = withResponseModel({ response_model: { schema, name: "Extract" }, mode: "TOOLS" // or "FUNCTIONS", "JSON", etc. });
Progressive Processing: Built on
schema-stream
for immediate access to partial resultsfor await (const chunk of stream) { // Safely access partial data with full type inference chunk._meta._completedPaths.forEach(path => { processDependency(path, chunk); }); }
Provider Flexibility: Consistent interface across different LLM response formats
// Works with various response modes const stream = OAIStream({ res: completion }); // OpenAI tools/functions const stream = JSONStream({ res: completion }); // Direct JSON
Think of it as a type-safe pipeline for handling streaming LLM data where you need to:
- Start processing before the full response arrives
- Ensure type safety throughout the stream
- Handle complex data dependencies
- Work with multiple LLM response formats
Installation
# npm
npm install zod-stream zod openai
# pnpm
pnpm add zod-stream zod openai
# bun
bun add zod-stream zod openai
Core Concepts
The ZodStream
client provides real-time validation and metadata for streaming LLM responses:
import ZodStream from "zod-stream";
import { z } from "zod";
const client = new ZodStream({
debug: true // Enable debug logging
});
// Define your extraction schema
const schema = z.object({
content: z.string(),
metadata: z.object({
confidence: z.number(),
category: z.string()
})
});
// Create streaming extraction
const stream = await client.create({
completionPromise: async () => {
const response = await fetch("/api/extract", {
method: "POST",
body: JSON.stringify({ prompt: "..." })
});
return response.body;
},
response_model: {
schema,
name: "ContentExtraction"
}
});
// Process with validation metadata
for await (const chunk of stream) {
console.log({
data: chunk, // Partial extraction result
isValid: chunk._meta._isValid, // Current validation state
activePath: chunk._meta._activePath, // Currently processing path
completedPaths: chunk._meta._completedPaths // Completed paths
});
}
Progressive Processing
zod-stream
enables processing dependent data as soon as relevant paths complete, without waiting for the full response:
// Define schema for a complex analysis
const schema = z.object({
user: z.object({
id: z.string(),
preferences: z.object({
theme: z.string(),
language: z.string()
})
}),
content: z.object({
title: z.string(),
body: z.string(),
metadata: z.object({
keywords: z.array(z.string()),
category: z.string()
})
}),
recommendations: z.array(z.object({
id: z.string(),
score: z.number(),
reason: z.string()
}))
});
// Process data as it becomes available
for await (const chunk of stream) {
// Start personalizing UI as soon as user preferences are ready
if (isPathComplete(['user', 'preferences'], chunk)) {
applyUserTheme(chunk.user.preferences.theme);
setLanguage(chunk.user.preferences.language);
}
// Begin content indexing once we have title and keywords
if (isPathComplete(['content', 'metadata', 'keywords'], chunk) &&
isPathComplete(['content', 'title'], chunk)) {
indexContent({
title: chunk.content.title,
keywords: chunk.content.metadata.keywords
});
}
// Start fetching recommended content in parallel
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'recommendations' && path.length === 2) {
const index = path[1] as number;
const recommendation = chunk.recommendations[index];
if (recommendation?.id) {
prefetchContent(recommendation.id);
}
}
});
}
This approach enables:
- Early UI updates based on user preferences
- Parallel processing of independent data
- Optimistic loading of related content
- Better perceived performance
- Resource optimization
Stream Metadata
Every streamed chunk includes metadata about validation state:
type CompletionMeta = {
_isValid: boolean; // Schema validation status
_activePath: (string | number)[]; // Current parsing path
_completedPaths: (string | number)[][]; // All completed paths
}
// Example chunk
{
content: "partial content...",
metadata: {
confidence: 0.95
},
_meta: {
_isValid: false, // Not valid yet
_activePath: ["metadata", "category"],
_completedPaths: [
["content"],
["metadata", "confidence"]
]
}
}
Schema Stubs
Get typed stub objects for initialization:
const schema = z.object({
users: z.array(z.object({
name: z.string(),
age: z.number()
}))
});
const client = new ZodStream();
const stub = client.getSchemaStub({
schema,
defaultData: {
users: [{ name: "loading...", age: 0 }]
}
});
Debug Logging
Enable detailed logging for debugging:
const client = new ZodStream({ debug: true });
// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context
Using Response Models
The withResponseModel
helper configures OpenAI parameters based on your schema and chosen mode:
import { withResponseModel } from "zod-stream";
import { z } from "zod";
const schema = z.object({
sentiment: z.string(),
keywords: z.array(z.string()),
confidence: z.number()
});
// Configure for OpenAI tools mode
const params = withResponseModel({
response_model: {
schema,
name: "Analysis",
description: "Extract sentiment and keywords"
},
mode: "TOOLS",
params: {
messages: [{ role: "user", content: "Analyze this text..." }],
model: "gpt-4"
}
});
const completion = await oai.chat.completions.create({
...params,
stream: true
});
Response Modes
zod-stream
supports multiple modes for structured LLM responses:
import { MODE } from "zod-stream";
const modes = {
FUNCTIONS: "FUNCTIONS", // OpenAI function calling
TOOLS: "TOOLS", // OpenAI tools API
JSON: "JSON", // Direct JSON response
MD_JSON: "MD_JSON", // JSON in markdown blocks
JSON_SCHEMA: "JSON_SCHEMA" // JSON with schema validation
} as const;
Mode-Specific Behaviors
TOOLS Mode
// Results in OpenAI tool configuration
{
tool_choice: {
type: "function",
function: { name: "Analysis" }
},
tools: [{
type: "function",
function: {
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {/* Generated from schema */}
}
}]
}
FUNCTIONS Mode (Legacy)
// Results in OpenAI function configuration
{
function_call: { name: "Analysis" },
functions: [{
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {/* Generated from schema */}
}]
}
JSON Mode
// Results in direct JSON response configuration
{
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: "Return JSON matching schema..."
},
// ... user messages
]
}
Response Parsing
Built-in parsers handle different response formats:
import {
OAIResponseParser,
OAIResponseToolArgsParser,
OAIResponseFnArgsParser,
OAIResponseJSONParser
} from "zod-stream";
// Automatic format detection
const result = OAIResponseParser(response);
// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);
Streaming Utilities
Handle streaming responses with built-in utilities:
import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";
// Create streaming response
app.post("/api/stream", async (req, res) => {
const completion = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(
OAIStream({ res: completion })
);
});
// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
console.log(chunk);
}
Path Tracking Utilities
Monitor completion status of specific paths:
import { isPathComplete } from "zod-stream";
const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
_meta: {
_completedPaths: [["analysis", "sentiment"]],
_activePath: ["analysis", "keywords"],
_isValid: false
}
});
Error Handling
zod-stream
provides error handling at multiple levels:
const stream = await client.create({
completionPromise: async () => response.body,
response_model: { schema }
});
let finalResult
// Path tracking for progressive updates
for await (const chunk of stream) {
finalResult = chunk
// Check which paths are complete
console.log("Completed paths:", chunk._meta._completedPaths);
console.log("Current path:", chunk._meta._activePath);
}
// Final validation happens after stream completes
const isValid = finalResult._meta._isValid
Real-World Use Cases
1. Progressive Data Analysis
const analysisSchema = z.object({
marketData: z.object({
trends: z.array(z.object({
metric: z.string(),
value: z.number()
})),
summary: z.string()
}),
competitors: z.array(z.object({
name: z.string(),
strengths: z.array(z.string()),
weaknesses: z.array(z.string())
})),
recommendations: z.object({
immediate: z.array(z.string()),
longTerm: z.array(z.string()),
budget: z.number()
})
});
for await (const chunk of stream) {
// Start visualizing market trends immediately
if (isPathComplete(['marketData', 'trends'], chunk)) {
initializeCharts(chunk.marketData.trends);
}
// Begin competitor analysis in parallel
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'competitors' && path.length === 2) {
const competitor = chunk.competitors[path[1] as number];
fetchCompetitorData(competitor.name);
}
});
// Start budget planning once we have immediate recommendations
if (isPathComplete(['recommendations', 'immediate'], chunk) &&
isPathComplete(['recommendations', 'budget'], chunk)) {
planBudgetAllocation({
actions: chunk.recommendations.immediate,
budget: chunk.recommendations.budget
});
}
}
2. Document Processing Pipeline
const documentSchema = z.object({
metadata: z.object({
title: z.string(),
author: z.string(),
topics: z.array(z.string())
}),
sections: z.array(z.object({
heading: z.string(),
content: z.string(),
annotations: z.array(z.object({
type: z.string(),
text: z.string(),
confidence: z.number()
}))
})),
summary: z.object({
abstract: z.string(),
keyPoints: z.array(z.string()),
readingTime: z.number()
})
});
for await (const chunk of stream) {
// Start document indexing as soon as metadata is available
if (isPathComplete(['metadata'], chunk)) {
indexDocument({
title: chunk.metadata.title,
topics: chunk.metadata.topics
});
}
// Process sections as they complete
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
const sectionIndex = path[1] as number;
const section = chunk.sections[sectionIndex];
// Process annotations for each completed section
processAnnotations({
heading: section.heading,
annotations: section.annotations
});
}
});
// Generate preview once we have abstract and reading time
if (isPathComplete(['summary', 'abstract'], chunk) &&
isPathComplete(['summary', 'readingTime'], chunk)) {
generatePreview({
abstract: chunk.summary.abstract,
readingTime: chunk.summary.readingTime
});
}
}
3. E-commerce Product Enrichment
const productSchema = z.object({
basic: z.object({
id: z.string(),
name: z.string(),
category: z.string()
}),
pricing: z.object({
base: z.number(),
discounts: z.array(z.object({
type: z.string(),
amount: z.number()
})),
final: z.number()
}),
inventory: z.object({
status: z.string(),
locations: z.array(z.object({
id: z.string(),
quantity: z.number()
}))
}),
enrichment: z.object({
seoDescription: z.string(),
searchKeywords: z.array(z.string()),
relatedProducts: z.array(z.string())
})
});
for await (const chunk of stream) {
// Start inventory checks as soon as basic info is available
if (isPathComplete(['basic'], chunk)) {
initializeProductCard(chunk.basic);
}
// Update pricing as soon as final price is calculated
if (isPathComplete(['pricing', 'final'], chunk)) {
updatePriceDisplay(chunk.pricing.final);
// If we also have inventory, update buy button
if (isPathComplete(['inventory', 'status'], chunk)) {
updateBuyButton({
price: chunk.pricing.final,
status: chunk.inventory.status
});
}
}
// Start SEO optimization in parallel
if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
optimizeProductSEO({
description: chunk.enrichment.seoDescription,
keywords: chunk.enrichment.searchKeywords
});
}
// Prefetch related products as they're identified
if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {
prefetchRelatedProducts(chunk.enrichment.relatedProducts);
}
}
With Next.js API Routes
// pages/api/extract.ts
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";
const schema = z.object({
summary: z.string(),
topics: z.array(z.string()),
sentiment: z.object({
score: z.number(),
label: z.string()
})
});
export default async function handler(req, res) {
const { content } = await req.json();
const params = withResponseModel({
response_model: {
schema,
name: "ContentAnalysis"
},
mode: "TOOLS",
params: {
messages: [{
role: "user",
content: `Analyze: ${content}`
}],
model: "gpt-4"
}
});
const stream = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(OAIStream({ res: stream }));
}
With React and stream-hooks
import { useJsonStream } from "stream-hooks";
import { z } from "zod";
const schema = z.object({
summary: z.string(),
topics: z.array(z.string())
});
function AnalysisComponent() {
const [data, setData] = useState<z.infer<typeof schema>>();
const {
loading,
error,
startStream
} = useJsonStream({
schema,
onReceive: (data) => {
setData(data)
}
});
return (
<div>
<button
onClick={() => startStream({
url: "/api/extract",
method: "POST",
body: { content: "..." }
})}
disabled={loading}
>
Start Analysis
</button>
{loading && <LoadingState paths={data._meta._completedPaths} />}
{error && <ErrorDisplay error={error} />}
<ProgressiveDisplay
data={data}
isComplete={data._meta._completedPaths.length > 0}
/>
</div>
);
}
Integration with Island AI
Part of the Island AI toolkit:
zod-stream
: Structured streamingstream-hooks
: React streaming hooksschema-stream
: Streaming JSON parserevalz
: LLM evaluationllm-polyglot
: Universal LLM clientinstructor
: High-level extraction
Contributing
We welcome contributions! Check out:
License
MIT © hack.dance