WorkflowChatTransport

Chat transport with automatic reconnection and recovery from interrupted streams.

The @workflow/ai package is currently in active development and should be considered experimental.

A chat transport implementation for the AI SDK that provides reliable message streaming with automatic reconnection to interrupted streams. This transport is a drop-in replacement for the default AI SDK transport, enabling seamless recovery from network issues, page refreshes, or Vercel Function timeouts.

WorkflowChatTransport implements the ChatTransport interface from the AI SDK and is designed to work with workflow-based chat applications. It requires endpoints that return the x-workflow-run-id header to enable stream resumption.

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";

export default function Chat() {
  const { messages, sendMessage } = useChat({
    transport: new WorkflowChatTransport(),
  });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>{m.content}</div>
      ))}
    </div>
  );
}

API Signature

Class

NameTypeDescription
apiany
fetchany
onChatSendMessageany
onChatEndany
maxConsecutiveErrorsany
initialStartIndexany
prepareSendMessagesRequestany
prepareReconnectToStreamRequestany
sendMessages(options: SendMessagesOptions<UI_MESSAGE> & ChatRequestOptions) => Promise<ReadableStream<UIMessageChunk>>Sends messages to the chat endpoint and returns a stream of response chunks. This method handles the entire chat lifecycle including: - Sending messages to the /api/chat endpoint - Streaming response chunks - Automatic reconnection if the stream is interrupted
sendMessagesIteratorany
reconnectToStream(options: ReconnectToStreamOptions & ChatRequestOptions) => Promise<ReadableStream<UIMessageChunk> | null>Reconnects to an existing chat stream that was previously interrupted. This method is useful for resuming a chat session after network issues, page refreshes, or Vercel Function timeouts.
reconnectToStreamIteratorany
onFinishany

WorkflowChatTransportOptions

NameTypeDescription
apistringAPI endpoint for chat requests Defaults to /api/chat if not provided
fetch{ (input: RequestInfo | URL, init?: RequestInit | undefined): Promise<Response>; (input: string | Request | URL, init?: RequestInit | undefined): Promise<...>; }Custom fetch implementation to use for HTTP requests. Defaults to the global fetch function if not provided.
onChatSendMessageOnChatSendMessage<UI_MESSAGE>Callback invoked after successfully sending messages to the chat endpoint. Useful for tracking chat history and inspecting response headers.
onChatEndOnChatEndCallback invoked when a chat stream ends (receives a "finish" chunk). Useful for cleanup operations or state updates.
maxConsecutiveErrorsnumberMaximum number of consecutive errors allowed during reconnection attempts. Defaults to 3 if not provided.
initialStartIndexnumberDefault startIndex to use when reconnecting to a stream without a known chunk position (i.e. the initial reconnection, not a retry). Negative values read from the end of the stream (e.g. -10 fetches the last 10 chunks), which is useful for resuming a chat UI after a page refresh without replaying the full conversation. Can be overridden per-call via ReconnectToStreamOptions.startIndex. Defaults to 0 (replay from the beginning).
prepareSendMessagesRequestPrepareSendMessagesRequest<UI_MESSAGE>Function to prepare the request for sending messages. Allows customizing the API endpoint, headers, credentials, and body.
prepareReconnectToStreamRequestPrepareReconnectToStreamRequestFunction to prepare the request for reconnecting to a stream. Allows customizing the API endpoint, headers, and credentials.

Key Features

  • Automatic Reconnection: Automatically recovers from interrupted streams with configurable retry limits
  • Workflow Integration: Seamlessly works with workflow-based endpoints that provide the x-workflow-run-id header
  • Customizable Requests: Allows intercepting and modifying requests via prepareSendMessagesRequest and prepareReconnectToStreamRequest
  • Stream Callbacks: Provides hooks for tracking chat lifecycle via onChatSendMessage and onChatEnd
  • Custom Fetch: Supports custom fetch implementations for advanced use cases

Good to Know

  • The transport expects chat endpoints to return the x-workflow-run-id header in the response to enable stream resumption
  • By default, the transport posts to /api/chat and reconnects via /api/chat/{runId}/stream
  • The onChatSendMessage callback receives the full response object, allowing you to extract and store the workflow run ID for session resumption
  • Stream interruptions are automatically detected when a "finish" chunk is not received in the initial response
  • The maxConsecutiveErrors option controls how many reconnection attempts are made before giving up (default: 3)
  • initialStartIndex (constructor option) sets the default chunk position for the first reconnection attempt (e.g. after a page refresh). Subsequent retries within the same reconnection loop always resume from the last received chunk. Negative values (e.g. -20) read from the end of the stream, which is useful for showing only recent output without replaying the full conversation. startIndex (per-call option on reconnectToStream) overrides initialStartIndex for a single reconnection
  • When using a negative initialStartIndex, the reconnection endpoint must return the x-workflow-stream-tail-index response header (via readable.getTailIndex()). The transport reads this header to compute absolute chunk positions for retries. Without it, startIndex is assumed to be 0, replaying the entire stream

Examples

Basic Chat Setup

"use client";

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useState } from "react";

export default function BasicChat() {
  const [input, setInput] = useState("");
  const { messages, sendMessage } = useChat({
    transport: new WorkflowChatTransport(),
  });

  return (
    <div>
      <div className="space-y-4">
        {messages.map((m) => (
          <div key={m.id}>
            <strong>{m.role}:</strong> {m.content}
          </div>
        ))}
      </div>

      <form
        onSubmit={(e) => {
          e.preventDefault();
          sendMessage({ text: input });
          setInput("");
        }}
      >
        <input
          value={input}
          placeholder="Say something..."
          onChange={(e) => setInput(e.currentTarget.value)}
        />
      </form>
    </div>
  );
}

With Session Persistence and Resumption

"use client";

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useMemo, useState } from "react";

export default function ChatWithResumption() {
  const [input, setInput] = useState("");
  const activeWorkflowRunId = useMemo(() => {
    if (typeof window === "undefined") return;
    return localStorage.getItem("active-workflow-run-id") ?? undefined;
  }, []);

  const { messages, sendMessage } = useChat({
    resume: !!activeWorkflowRunId,
    transport: new WorkflowChatTransport({
      onChatSendMessage: (response, options) => {
        // Save chat history to localStorage
        localStorage.setItem(
          "chat-history",
          JSON.stringify(options.messages)
        );

        // Extract and store the workflow run ID for session resumption
        const workflowRunId = response.headers.get("x-workflow-run-id");
        if (workflowRunId) {
          localStorage.setItem("active-workflow-run-id", workflowRunId);
        }
      },
      onChatEnd: ({ chatId, chunkIndex }) => {
        console.log(`Chat ${chatId} completed with ${chunkIndex} chunks`);
        // Clear the active run ID when chat completes
        localStorage.removeItem("active-workflow-run-id");
      },
    }),
  });

  return (
    <div>
      <div className="space-y-4">
        {messages.map((m) => (
          <div key={m.id}>
            <strong>{m.role}:</strong> {m.content}
          </div>
        ))}
      </div>

      <form
        onSubmit={(e) => {
          e.preventDefault();
          sendMessage({ text: input });
          setInput("");
        }}
      >
        <input
          value={input}
          placeholder="Say something..."
          onChange={(e) => setInput(e.currentTarget.value)}
        />
      </form>
    </div>
  );
}

With Custom Request Configuration

"use client";

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useState } from "react";

export default function ChatWithCustomConfig() {
  const [input, setInput] = useState("");
  const { messages, sendMessage } = useChat({
    transport: new WorkflowChatTransport({
      prepareSendMessagesRequest: async (config) => {
        return {
          ...config,
          api: "/api/chat",
          headers: {
            ...config.headers,
            "Authorization": `Bearer ${process.env.NEXT_PUBLIC_API_TOKEN}`,
            "X-Custom-Header": "custom-value",
          },
          credentials: "include",
        };
      },
      prepareReconnectToStreamRequest: async (config) => {
        return {
          ...config,
          headers: {
            ...config.headers,
            "Authorization": `Bearer ${process.env.NEXT_PUBLIC_API_TOKEN}`,
          },
          credentials: "include",
        };
      },
      maxConsecutiveErrors: 5,
    }),
  });

  return (
    <div>
      <div className="space-y-4">
        {messages.map((m) => (
          <div key={m.id}>
            <strong>{m.role}:</strong> {m.content}
          </div>
        ))}
      </div>

      <form
        onSubmit={(e) => {
          e.preventDefault();
          sendMessage({ text: input });
          setInput("");
        }}
      >
        <input
          value={input}
          placeholder="Say something..."
          onChange={(e) => setInput(e.currentTarget.value)}
        />
      </form>
    </div>
  );
}

See Also