Example: Custom Transport
Build a custom ChatTransport adapter and wire it into the SDK. Two examples: a fetch-based REST transport and a WebSocket transport with reconnection.
The ChatTransport interface
Every transport implements this interface (from gecx-chat):
interface ChatTransport {
readonly name?: string;
readonly capabilities?: TransportCapabilities;
connect(sessionId: string): Promise<void>;
send(request: SendRequest): Promise<void>;
stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent>;
reconnect?(reason: string): Promise<void>;
resumeStream?(cursor: TurnCursor, signal?: AbortSignal): AsyncIterable<TransportEvent>;
close(): Promise<void>;
}
interface TransportCapabilities {
class: 'request-response' | 'server-stream' | 'bidi';
reconnect: boolean;
resume: boolean;
multiplex: boolean;
protocolVersion: string;
}
The SDK reads capabilities.class at construction time and adapts its streaming loop:
request-response-- the SDK callsstream(), which yields a synthetic event sequence from a single JSON response.server-stream--stream()yields events from an SSE or NDJSON stream.bidi--stream()yields events from a long-lived duplex connection.
Example 1: Fetch-based REST transport
Posts user messages as JSON. Parses the JSON response into synthetic TransportEvents. Good for backends behind API Gateway with hard timeout limits.
// lib/fetchTransport.ts
import type {
ChatTransport,
SendRequest,
TransportCapabilities,
TransportEvent,
} from 'gecx-chat';
interface FetchTransportOptions {
endpoint: string;
headers?: Record<string, string>;
}
export function createFetchTransport(options: FetchTransportOptions): ChatTransport {
const { endpoint, headers = {} } = options;
const capabilities: TransportCapabilities = {
class: 'request-response',
reconnect: false,
resume: false,
multiplex: false,
protocolVersion: '1',
};
return {
name: 'custom-fetch',
capabilities,
async connect(): Promise<void> {
// Nothing to open for a stateless HTTP transport.
},
async send(request: SendRequest): Promise<void> {
await fetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...headers },
body: JSON.stringify(request),
});
},
async *stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent> {
const res = await fetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...headers },
body: JSON.stringify(request),
signal,
});
if (!res.ok) throw new Error(`Transport returned ${res.status}`);
const body = await res.json();
const responseId = body.responseId ?? crypto.randomUUID();
const requestId = crypto.randomUUID();
const ts = new Date().toISOString();
let seq = 0;
yield {
type: 'response.started',
responseId,
requestId,
timestamp: ts,
sequence: seq++,
} as TransportEvent;
if (body.text) {
yield {
type: 'text.delta',
delta: body.text,
responseId,
requestId,
timestamp: new Date().toISOString(),
sequence: seq++,
} as TransportEvent;
}
yield {
type: 'response.completed',
responseId,
requestId,
timestamp: new Date().toISOString(),
sequence: seq++,
} as TransportEvent;
},
async close(): Promise<void> {},
};
}
Use it
import { createChatClient, tokenEndpointAuth } from 'gecx-chat';
import { createFetchTransport } from './lib/fetchTransport';
const client = createChatClient({
auth: tokenEndpointAuth({ endpoint: '/api/chat/token' }),
transport: createFetchTransport({ endpoint: '/api/chat/completions' }),
});
Example 2: WebSocket transport with reconnection
Opens a persistent WebSocket. Sends JSON frames, receives TransportEvents. Declares reconnect: true so the SDK's recovery loop can call reconnect() after a disconnect.
// lib/wsTransport.ts
import type {
ChatTransport,
SendRequest,
TransportCapabilities,
TransportEvent,
} from 'gecx-chat';
interface WsTransportOptions {
url: string;
}
export function createWsTransport(options: WsTransportOptions): ChatTransport {
let ws: WebSocket | null = null;
let sessionId: string | null = null;
const capabilities: TransportCapabilities = {
class: 'bidi',
reconnect: true,
resume: false,
multiplex: false,
protocolVersion: '1',
};
function open(): Promise<void> {
return new Promise((resolve, reject) => {
ws = new WebSocket(options.url);
ws.addEventListener('open', () => {
// Send a hello frame so the server knows which session this is
ws!.send(JSON.stringify({ kind: 'hello', sessionId }));
resolve();
});
ws.addEventListener('error', () => reject(new Error('WebSocket connect failed')));
});
}
return {
name: 'custom-ws',
capabilities,
async connect(id: string): Promise<void> {
sessionId = id;
await open();
},
async send(request: SendRequest): Promise<void> {
if (!ws || ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
ws.send(JSON.stringify({ kind: 'send', request }));
},
async *stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent> {
if (!ws || ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
ws.send(JSON.stringify({ kind: 'send', request }));
// Queue incoming events until the turn completes or aborts
const queue: TransportEvent[] = [];
let done = false;
let resolve: (() => void) | null = null;
function onMessage(ev: MessageEvent) {
const frame = JSON.parse(ev.data as string);
if (frame.kind === 'event') {
queue.push(frame.event as TransportEvent);
resolve?.();
}
}
ws.addEventListener('message', onMessage);
signal?.addEventListener('abort', () => { done = true; resolve?.(); });
try {
while (!done) {
if (queue.length > 0) {
const event = queue.shift()!;
yield event;
if (event.type === 'response.completed' || event.type === 'session.ended') {
break;
}
} else {
await new Promise<void>((r) => { resolve = r; });
resolve = null;
}
}
} finally {
ws?.removeEventListener('message', onMessage);
}
},
async reconnect(reason: string): Promise<void> {
// Close the old socket and reopen
if (ws) {
try { ws.close(); } catch { /* ignore */ }
}
await open();
},
async close(): Promise<void> {
if (ws) {
ws.send(JSON.stringify({ kind: 'bye' }));
ws.close();
ws = null;
}
},
};
}
Use it
import { createChatClient, tokenEndpointAuth } from 'gecx-chat';
import { createWsTransport } from './lib/wsTransport';
const client = createChatClient({
auth: tokenEndpointAuth({ endpoint: '/api/chat/token' }),
transport: createWsTransport({ url: 'wss://chat.example.com/ws' }),
});
Testing a custom transport
Use the SDK's mock scenario infrastructure to verify your transport emits a valid event stream without hitting a real server.
// __tests__/fetchTransport.test.ts
import { describe, it, expect, vi } from 'vitest';
import { createFetchTransport } from '../lib/fetchTransport';
import type { TransportEvent } from 'gecx-chat';
describe('createFetchTransport', () => {
it('yields a valid event stream', async () => {
// Stub fetch to return a canned response
globalThis.fetch = vi.fn().mockResolvedValue({
ok: true,
json: async () => ({
responseId: 'r-1',
text: 'Hello from the backend',
}),
});
const transport = createFetchTransport({ endpoint: '/api/chat' });
await transport.connect('session-1');
const events: TransportEvent[] = [];
for await (const event of transport.stream({
sessionId: 'session-1',
text: 'Hi',
})) {
events.push(event);
}
expect(events[0].type).toBe('response.started');
expect(events[1]).toMatchObject({ type: 'text.delta', delta: 'Hello from the backend' });
expect(events[2].type).toBe('response.completed');
});
});
Key design decisions
capabilities.classtells the SDK whether to expect a single response (request-response), a chunked stream (server-stream), or a persistent connection (bidi). Set this honestly -- it controls timeout and recovery behavior.reconnectshould betrueonly if your transport can reopen its underlying connection. The SDK's recovery loop callsreconnect(reason)before retrying a failedstream().resumerequires implementingresumeStream(cursor)and de-duplicating bycursor.lastSequence. Leave itfalseunless your server can replay from a checkpoint.nameshows up in debug bundles and trace output. Set it to something descriptive.
Validate against the contract harness
Before shipping, validate your transport against runTransportContractTests from gecx-chat/testing/vitest:
import { runTransportContractTests } from 'gecx-chat/testing/vitest';
import { describe } from 'vitest';
import { createMyTransport } from './myTransport';
describe('myTransport', () => {
runTransportContractTests({
name: 'myTransport',
factory: () => createMyTransport({ /* ... */ }),
});
});
The harness validates: connect() accepts a session id and resolves; capability class advertised matches actual behaviour; protocolVersion is set; stream() returns an AsyncIterable<TransportEvent> that respects AbortSignal; close() resolves; and (for tier-2 transports) conditional reconnect/resume behaviour matches the contract. The bundled mockTransport is itself validated by this harness, so the bar is "behave exactly like the mock for the parts of the contract you implement." See Testing.
docs/examples/custom-transport.md