Skip to main content

Streaming

The SDK provides built-in support for Server-Sent Events (SSE) streaming for real-time data.

Chat Streaming

useStreamingChat Hook

The most common streaming use case - real-time chat responses:

import { useStreamingChat } from "@dgidgi-one/sdk";

function ChatInterface() {
const {
// State
isStreaming,
currentText, // Accumulated response text
tasks, // Current agent tasks
checkpoints, // Execution checkpoints
oodaPhases, // OODA loop phases
runInfo, // Run metadata
error,

// Actions
streamMessage, // Start streaming
cancelStream, // Cancel current stream
resetStream, // Reset all state
} = useStreamingChat();

const handleSend = async (message: string) => {
await streamMessage({
sessionId: "session-123",
content: message,
projectId: "project-456",
});
};

return (
<div>
{/* Message input */}
<input onSubmit={handleSend} disabled={isStreaming} />

{/* Streaming indicator */}
{isStreaming && <div className="typing-indicator">AI is typing...</div>}

{/* Response text (updates in real-time) */}
<div className="response">{currentText}</div>

{/* Tasks being executed */}
{tasks.map(task => (
<TaskCard key={task.id} task={task} />
))}

{/* Cancel button */}
{isStreaming && (
<button onClick={cancelStream}>Stop Generation</button>
)}

{/* Error handling */}
{error && <div className="error">{error.message}</div>}
</div>
);
}

StreamMessage Options

await streamMessage({
sessionId: string; // Required: Chat session ID
content: string; // Required: User message
projectId?: string; // Optional: Project context
agentId?: string; // Optional: Specific agent
mode?: "fast" | "autonomous"; // Agent mode
});

Streaming Events

The stream emits various event types:

// Token events (text chunks)
{ event: "token", data: { text: "Hello" } }

// Task events
{ event: "task_started", data: { id: "task-1", name: "Reading file" } }
{ event: "task_completed", data: { id: "task-1" } }

// Checkpoint events
{ event: "checkpoint_created", data: { step: 1, description: "..." } }

// Run lifecycle
{ event: "run_started", data: { runId: "run-123" } }
{ event: "run_completed", data: { runId: "run-123" } }

// Error events
{ event: "error", data: { message: "Something went wrong" } }

Structured Streaming

For more control over streaming events:

import { useStructuredStreaming } from "@dgidgi-one/sdk";

function AgentRunView() {
const {
isStreaming,
events, // All received events
currentPhase, // Current execution phase
taskCards, // Rendered task cards
stats, // Token/cost statistics
error,
startStream,
stopStream,
} = useStructuredStreaming();

const handleStart = async () => {
await startStream({
endpoint: `/runs/${runId}/stream`,
onEvent: (event) => {
// Custom event handling
switch (event.type) {
case "tool_started":
console.log("Tool:", event.data.toolName);
break;
case "file_edited":
console.log("Edited:", event.data.path);
break;
}
},
onComplete: () => {
console.log("Stream completed");
},
onError: (error) => {
console.error("Stream error:", error);
},
});
};

return (
<div>
<button onClick={handleStart}>Start Run</button>
<button onClick={stopStream}>Stop</button>

{/* Render task cards */}
{taskCards.map(card => (
<TaskCard key={card.id} {...card} />
))}

{/* Show stats */}
<div>Tokens: {stats.tokens}</div>
<div>Cost: ${stats.cost.toFixed(4)}</div>
</div>
);
}

Direct Streaming API

For non-React usage or custom implementations:

import { getClient } from "@dgidgi-one/sdk";

const client = getClient();

// Async generator for SSE
async function* streamChat(sessionId: string, message: string) {
const stream = client.stream(`/chat/${sessionId}/stream`, {
method: "POST",
body: { content: message },
});

for await (const { event, data } of stream) {
yield { event, data };
}
}

// Usage
for await (const { event, data } of streamChat("session-123", "Hello")) {
if (event === "token") {
process.stdout.write(data.text);
}
}

Stream Method

// Returns AsyncGenerator<{ event: string; data: unknown }>
const stream = client.stream(endpoint, options?);

// Options
interface StreamOptions {
method?: "GET" | "POST";
body?: unknown;
headers?: Record<string, string>;
signal?: AbortSignal;
}

Event Types Reference

Chat Events

EventDataDescription
token{ text: string }Text chunk
message_complete{ content: string }Full message
error{ message: string }Error occurred

Agent Run Events

EventDataDescription
run_started{ runId, goal }Run began
run_completed{ runId, result }Run finished
run_failed{ runId, error }Run failed
step_started{ stepId, description }Step began
step_completed{ stepId, result }Step finished
tool_started{ toolId, toolName, params }Tool executing
tool_output{ toolId, output }Tool output
tool_completed{ toolId, result }Tool finished
approval_required{ toolId, message }Needs approval

File Events

EventDataDescription
file_opened{ path }File opened
file_edited{ path, changes }File modified
file_created{ path }New file
file_deleted{ path }File removed

Memory Events

EventDataDescription
memory_updated{ type, content }Memory added
checkpoint_saved{ stepId }Checkpoint created

Cancellation

Cancel with Hook

const { cancelStream } = useStreamingChat();

// Cancel the current stream
cancelStream();

Cancel with AbortController

const controller = new AbortController();

// Start stream with signal
const stream = client.stream("/endpoint", {
signal: controller.signal,
});

// Cancel after 10 seconds
setTimeout(() => controller.abort(), 10000);

// Handle in iteration
try {
for await (const event of stream) {
// Process event
}
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream cancelled");
}
}

Error Handling

const { error, resetStream } = useStreamingChat();

// Check for errors
if (error) {
console.error("Stream error:", error.message);

// Reset and retry
resetStream();
await streamMessage({ ... });
}

Common Errors

ErrorCauseSolution
TIMEOUTStream took too longIncrease timeout or check server
DISCONNECTEDNetwork issueRetry with exponential backoff
UNAUTHORIZEDToken expiredRe-authenticate
RATE_LIMITEDToo many requestsWait and retry

Best Practices

  1. Always handle cancellation - Users may navigate away
  2. Show streaming indicator - Let users know something is happening
  3. Buffer text updates - Don't update DOM on every character
  4. Handle reconnection - Network can be unreliable
  5. Clean up on unmount - Cancel streams when component unmounts
useEffect(() => {
return () => {
cancelStream(); // Clean up on unmount
};
}, []);