tRPC Communication
Code uses tRPC v11 for end-to-end type-safe communication between client and server. The architecture supports both in-process and HTTP/SSE transports.
Overview
tRPC provides zero-overhead type safety without code generation. Changes to the server API are immediately reflected in the client at compile time.
Key Benefits:
- Full TypeScript type safety
- No code generation needed
- Automatic API documentation
- Multiple transport options
- Subscription support
Architecture
Type Flow
// Server defines procedures
const appRouter = router({
message: router({
streamResponse: publicProcedure
.input(z.object({ sessionId: z.string(), content: z.array(...) }))
.subscription(({ input }) => {
return observable<StreamEvent>((emit) => {
// Stream AI responses
})
})
})
})
// Client gets automatic types
type AppRouter = typeof appRouter
// Usage is fully typed
const client = createClient<AppRouter>(...)
const subscription = client.message.streamResponse.subscribe({
sessionId: 'abc', // ✅ Type-safe
content: [...] // ✅ Type-safe
})Transport Options
Code supports two transport mechanisms:
1. In-Process Link (Primary)
Direct function calls within the same process:
import { createInProcessLink } from '@sylphx/code-client'
const client = createTRPCProxyClient<AppRouter>({
links: [
createInProcessLink({
router: appRouter,
createContext: () => ({ appContext })
})
]
})Characteristics:
- Zero serialization overhead
- Direct memory access
- ~0.1ms function call overhead
- 30x faster than HTTP
- Same-process only
Use Cases:
- Terminal UI (TUI)
- Local development
- Embedded scenarios
2. HTTP/SSE Link (Daemon Mode)
HTTP requests + Server-Sent Events for subscriptions:
import { httpBatchLink, splitLink } from '@trpc/client'
const client = createTRPCProxyClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({ url: 'http://localhost:3000' }),
false: httpBatchLink({ url: 'http://localhost:3000' })
})
]
})Characteristics:
- Network-based communication
- Batched HTTP requests
- SSE for real-time subscriptions
- ~1-3ms latency (localhost)
- Multi-process/remote support
Use Cases:
- Web UI
- Remote clients
- Daemon server
- Multi-user scenarios
Router Structure
Root Router
// packages/code-server/src/trpc/router.ts
export const appRouter = router({
session: sessionRouter,
message: messageRouter,
config: configRouter,
events: eventsRouter
})
export type AppRouter = typeof appRouterSession Router
const sessionRouter = router({
// Query procedures
list: publicProcedure
.query(async ({ ctx }) => {
return await ctx.appContext.core.sessions.list()
}),
get: publicProcedure
.input(z.object({ sessionId: z.string() }))
.query(async ({ ctx, input }) => {
return await ctx.appContext.core.sessions.get(input.sessionId)
}),
// Mutation procedures
create: publicProcedure
.input(z.object({ provider: z.string(), model: z.string() }))
.mutation(async ({ ctx, input }) => {
const session = await ctx.appContext.core.sessions.create(input)
// Emit event
ctx.appContext.eventStream.publish('session-events', {
type: 'session-created',
sessionId: session.id
})
return session
}),
delete: publicProcedure
.input(z.object({ sessionId: z.string() }))
.mutation(async ({ ctx, input }) => {
await ctx.appContext.core.sessions.delete(input.sessionId)
ctx.appContext.eventStream.publish('session-events', {
type: 'session-deleted',
sessionId: input.sessionId
})
}),
compact: moderateProcedure
.input(z.object({ sessionId: z.string() }))
.mutation(async ({ ctx, input }) => {
// Generate summary and create new session
const result = await compactSession(ctx, input.sessionId)
// Emit session-created event
ctx.appContext.eventStream.publish('session-events', {
type: 'session-created',
sessionId: result.newSessionId
})
// Auto-trigger AI streaming (server-side)
streamAIResponse({
sessionId: result.newSessionId,
userMessageContent: null // Use existing messages
}).subscribe({
next: (event) => {
ctx.appContext.eventStream.publish(
`session:${result.newSessionId}`,
event
)
}
})
return result
})
})Message Router
const messageRouter = router({
// Subscription procedures
streamResponse: publicProcedure
.input(z.object({
sessionId: z.string(),
content: z.array(ParsedContentPartSchema)
}))
.subscription(({ ctx, input }) => {
return observable<StreamEvent>((emit) => {
// Stream AI responses
const subscription = streamAIResponse({
sessionId: input.sessionId,
userMessageContent: input.content
}).subscribe({
next: (event) => {
// Path A: Direct subscription
emit.next(event)
// Path B: Event stream (multi-client sync)
ctx.appContext.eventStream.publish(
`session:${input.sessionId}`,
event
)
},
error: (err) => emit.error(err),
complete: () => emit.complete()
})
// Cleanup on unsubscribe
return () => subscription.unsubscribe()
})
})
})Events Router
const eventsRouter = router({
// Subscribe to session-specific events
subscribeToSession: publicProcedure
.input(z.object({
sessionId: z.string(),
replayLast: z.number().optional()
}))
.subscription(({ ctx, input }) => {
return observable<StreamEvent>((emit) => {
const subscription = ctx.appContext.eventStream
.subscribe(`session:${input.sessionId}`, input.replayLast)
.subscribe({
next: (event) => emit.next(event),
error: (err) => emit.error(err)
})
return () => subscription.unsubscribe()
})
}),
// Subscribe to all session lifecycle events
subscribeToAllSessions: publicProcedure
.subscription(({ ctx }) => {
return observable<SessionEvent>((emit) => {
const subscription = ctx.appContext.eventStream
.subscribe('session-events')
.subscribe({
next: (event) => emit.next(event),
error: (err) => emit.error(err)
})
return () => subscription.unsubscribe()
})
})
})Procedure Types
Query
Read-only operations that fetch data:
const getSession = publicProcedure
.input(z.object({ sessionId: z.string() }))
.query(async ({ ctx, input }) => {
return await ctx.appContext.core.sessions.get(input.sessionId)
})Characteristics:
- GET requests (HTTP transport)
- Cacheable
- Idempotent
- No side effects
Mutation
Operations that modify data:
const createSession = publicProcedure
.input(z.object({ provider: z.string(), model: z.string() }))
.mutation(async ({ ctx, input }) => {
return await ctx.appContext.core.sessions.create(input)
})Characteristics:
- POST requests (HTTP transport)
- Not cacheable
- Can have side effects
- State modifications
Subscription
Long-lived connections for real-time updates:
const streamResponse = publicProcedure
.input(z.object({ sessionId: z.string() }))
.subscription(({ ctx, input }) => {
return observable<StreamEvent>((emit) => {
// Stream events over time
const subscription = source.subscribe(emit)
return () => subscription.unsubscribe()
})
})Characteristics:
- SSE (Server-Sent Events) in HTTP transport
- Observable-based in in-process
- Real-time updates
- Bidirectional communication
Context
AppContext
Every procedure receives a context object:
interface Context {
appContext: AppContext
}
interface AppContext {
core: CodeCore // Headless SDK
eventStream: EventStream // Event streaming service
sessions: Map<string, Session>
}Creating Context
In-Process:
createContext: () => ({
appContext: getAppContext()
})HTTP:
createContext: (opts: CreateHTTPContextOptions) => ({
appContext: getAppContext(),
req: opts.req,
res: opts.res
})Input Validation
All inputs are validated with Zod schemas:
const streamResponseInput = z.object({
sessionId: z.string(),
content: z.array(
z.discriminatedUnion('type', [
z.object({ type: z.literal('text'), content: z.string() }),
z.object({ type: z.literal('image'), data: z.string() })
])
)
})
const procedure = publicProcedure
.input(streamResponseInput)
.subscription(({ input }) => {
// input is fully typed and validated
input.sessionId // string ✅
input.content // ContentPart[] ✅
})Benefits:
- Runtime validation
- Compile-time types
- Automatic error handling
- Clear error messages
Error Handling
TRPCError
import { TRPCError } from '@trpc/server'
const getSession = publicProcedure
.input(z.object({ sessionId: z.string() }))
.query(async ({ ctx, input }) => {
const session = await ctx.appContext.core.sessions.get(input.sessionId)
if (!session) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Session ${input.sessionId} not found`
})
}
return session
})Error Codes:
BAD_REQUEST- Invalid inputUNAUTHORIZED- Auth requiredFORBIDDEN- No permissionNOT_FOUND- Resource not foundINTERNAL_SERVER_ERROR- Server errorTIMEOUT- Request timeout
Client Error Handling
try {
const session = await client.session.get.query({ sessionId: 'abc' })
} catch (error) {
if (error instanceof TRPCClientError) {
console.error('tRPC Error:', error.message)
console.error('Code:', error.data?.code)
console.error('Cause:', error.cause)
}
}Middleware
Rate Limiting
const rateLimitMiddleware = t.middleware(async ({ ctx, next, path }) => {
const key = `${ctx.userId}:${path}`
const allowed = await checkRateLimit(key)
if (!allowed) {
throw new TRPCError({
code: 'TOO_MANY_REQUESTS',
message: 'Rate limit exceeded'
})
}
return next()
})
const moderateProcedure = publicProcedure.use(rateLimitMiddleware)Logging
const loggingMiddleware = t.middleware(async ({ ctx, next, path, type }) => {
const start = Date.now()
const result = await next()
const duration = Date.now() - start
console.log(`${type} ${path} - ${duration}ms`)
return result
})Authentication
const authMiddleware = t.middleware(async ({ ctx, next }) => {
if (!ctx.user) {
throw new TRPCError({ code: 'UNAUTHORIZED' })
}
return next({ ctx: { ...ctx, user: ctx.user } })
})
const protectedProcedure = publicProcedure.use(authMiddleware)Client Usage
Creating Client
import { createClient } from '@sylphx/code-client'
const client = createClient({
transport: 'in-process', // or 'http'
serverUrl: 'http://localhost:3000' // for HTTP transport
})Query
// Fetch data
const sessions = await client.session.list.query()
const session = await client.session.get.query({ sessionId: 'abc' })Mutation
// Modify data
const newSession = await client.session.create.mutate({
provider: 'openrouter',
model: 'anthropic/claude-3.5-sonnet'
})
await client.session.delete.mutate({ sessionId: 'abc' })Subscription
// Real-time updates
const subscription = client.message.streamResponse.subscribe(
{ sessionId: 'abc', content: [{ type: 'text', content: 'Hello' }] },
{
onData: (event) => {
if (event.type === 'text-delta') {
console.log('Token:', event.text)
}
},
onError: (error) => {
console.error('Stream error:', error)
},
onComplete: () => {
console.log('Stream complete')
}
}
)
// Cleanup
subscription.unsubscribe()Performance Optimization
Batching (HTTP Transport)
Multiple queries batched into single HTTP request:
const [sessions, config, stats] = await Promise.all([
client.session.list.query(),
client.config.get.query(),
client.stats.get.query()
])
// Sent as single HTTP request ✅
// Not 3 separate requests ❌Deduplication
Duplicate requests are automatically deduplicated:
// Only makes one actual request
const [result1, result2] = await Promise.all([
client.session.get.query({ sessionId: 'abc' }),
client.session.get.query({ sessionId: 'abc' })
])Subscription Management
// Reuse subscriptions
const subscriptionCache = new Map()
function getSubscription(sessionId: string) {
if (!subscriptionCache.has(sessionId)) {
const sub = client.events.subscribeToSession.subscribe({ sessionId })
subscriptionCache.set(sessionId, sub)
}
return subscriptionCache.get(sessionId)
}
// Cleanup on unmount
useEffect(() => {
const sub = getSubscription(sessionId)
return () => {
sub.unsubscribe()
subscriptionCache.delete(sessionId)
}
}, [sessionId])Testing
Mock Client
import { createMockClient } from '@sylphx/code-client/testing'
const mockClient = createMockClient<AppRouter>({
session: {
list: () => Promise.resolve([]),
get: ({ sessionId }) => Promise.resolve({ id: sessionId }),
create: (input) => Promise.resolve({ id: 'new', ...input })
}
})Integration Tests
import { createTestServer } from '@sylphx/code-server/testing'
describe('tRPC Router', () => {
let server: ReturnType<typeof createTestServer>
let client: TRPCClient<AppRouter>
beforeEach(() => {
server = createTestServer()
client = server.getClient()
})
afterEach(() => {
server.close()
})
it('creates session', async () => {
const session = await client.session.create.mutate({
provider: 'openrouter',
model: 'anthropic/claude-3.5-sonnet'
})
expect(session.id).toBeDefined()
expect(session.provider).toBe('openrouter')
})
})Best Practices
Type Safety
✅ Do:
// Let TypeScript infer types
const session = await client.session.get.query({ sessionId: 'abc' })
// session is fully typed ✅❌ Don't:
// Avoid explicit type casts
const session = (await client.session.get.query({ sessionId: 'abc' })) as SessionError Handling
✅ Do:
try {
await client.session.delete.mutate({ sessionId })
} catch (error) {
if (error instanceof TRPCClientError) {
if (error.data?.code === 'NOT_FOUND') {
// Handle not found
}
}
}❌ Don't:
// Don't ignore errors
client.session.delete.mutate({ sessionId }) // ❌Subscription Cleanup
✅ Do:
useEffect(() => {
const sub = client.events.subscribeToSession.subscribe(...)
return () => sub.unsubscribe() // ✅ Cleanup
}, [sessionId])❌ Don't:
// Don't forget to unsubscribe
useEffect(() => {
client.events.subscribeToSession.subscribe(...)
// ❌ Memory leak
}, [sessionId])Related Documentation
- Event Streaming - Event system details
- Architecture Overview - Overall system design
- API Reference - Complete API docs