Kairo

Transports

Transports (@kairo/extension-transport-*) - Communication protocols between extensions

Transports (@kairo/extension-transport-*)

Transports define how messages are moved between the Kairo core and its extensions. By strictly decoupling the transport mechanism from the actual JSON-RPC protocol, Kairo supports running extensions in virtually any execution environment.

Architectural Role

In the Kairo framework, a Transport acts as the absolute lowest-level "dumb pipe."

  • It does not care about the JSON-RPC format, method names, or request IDs
  • It only knows how to stringify an ExtensionMessage object and push bytes down a wire (and vice versa)
  • The higher-level ExtensionProtocol entirely handles request timeout matching, schema validation, and routing

Supported Transports

The Kairo repository ships with three first-party transports:

1. Stdio Transport (@kairo/extension-transport-stdio)

Used for local processes interacting via standard input (stdin) and output (stdout). This is the most common transport for orchestrating side-car CLI tools or system-level extensions (such as the default MCP setup).

2. HTTP Transport (@kairo/extension-transport-http)

Enables communication over the network using standard HTTP requests and Server-Sent Events (SSE). This allows extensions to be hosted as remote microservices anywhere in the world.

3. Worker Transport (@kairo/extension-transport-worker)

Designed for browser environments (Web Workers) or Node.js (worker_threads). This provides perfect sandbox isolation and prevents heavy extension logic (like local RAG indexing) from blocking the main thread.

Implementation Guide: Building a Custom Transport

If you want to support a new communication channel—such as WebSocket, WebRTC, or MQTT—you only need to implement the simple ExtensionTransport interface defined in @kairo/core.

The ExtensionTransport Interface Lifecycle

To write a transport, your class just needs to handle these core responsibilities:

  1. connect(signal): Establish the connection
  2. send(message): Serialize ExtensionMessage and write it to your socket
  3. close(): Gracefully clean up the connection
  4. Events: Fire the onMessage, onError, onClose, or onConnectionLost hooks to notify the ExtensionProtocol above

Example: A Basic WebSocket Transport

Here is a conceptual example of how a developer could implement a custom WebSocket client transport for Kairo in just a few dozen lines of code:

import type { ExtensionTransport, ExtensionMessage } from "@kairo/core";

export class WebSocketTransportClient implements ExtensionTransport {
  #ws: WebSocket | null = null;
  public url: string;

  // Lifecycle Hooks (Populated automatically by ExtensionProtocol)
  public onMessage?: (message: ExtensionMessage) => void;
  public onError?: (error: Error) => void;
  public onClose?: () => void;
  public onConnectionLost?: (error: Error, retry: () => Promise<void>) => void;

  constructor(url: string) {
    this.url = url;
  }

  // 1. Establish connection
  async connect(signal: AbortSignal): Promise<void> {
    return new Promise((resolve, reject) => {
      this.#ws = new WebSocket(this.url);

      this.#ws.onopen = () => resolve();

      this.#ws.onerror = (e) => {
        const err = new Error("WebSocket Error");
        if (this.#ws?.readyState !== WebSocket.OPEN) reject(err);
        this.onError?.(err);
      };

      this.#ws.onclose = (e) => {
        if (!e.wasClean) {
          // Unexpected drop (Network loss)
          this.onConnectionLost?.(new Error("Connection reset"), async () =>
            this.connect(signal),
          );
        } else {
          // Graceful shutdown
          this.onClose?.();
        }
      };

      // 2. Read incoming messages and pass to Protocol
      this.#ws.onmessage = (event) => {
        try {
          const message: ExtensionMessage = JSON.parse(event.data);
          this.onMessage?.(message);
        } catch (err) {
          this.onError?.(new Error("Failed to parse message"));
        }
      };

      // Handle abortion during connection
      signal.addEventListener("abort", () => {
        this.#ws?.close();
        reject(new Error("Connection aborted"));
      });
    });
  }

  // 3. Send outgoing messages across the wire
  async send(message: ExtensionMessage): Promise<void> {
    if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
      throw new Error("WebSocket is not connected");
    }
    this.#ws.send(JSON.stringify(message));
  }

  // 4. Graceful disconnect routine
  async close(): Promise<void> {
    if (this.#ws) {
      this.#ws.close(1000, "Client intentionally closed");
      this.#ws = null;
    }
  }
}

Once this implementation is complete, you can pass it to any LmPipelineExtensionClient or ExtensionServer, completely changing the application's physical architecture without touching a single line of your actual tool or RAG logic!

On this page