Skip to main content
Version: 11.x

HTTP Subscription Link

httpSubscriptionLink is a terminating link that's uses Server-sent Events (SSE) for subscriptions.

SSE is a good option for real-time as it's a bit easier to deal with than WebSockets and handles things like reconnecting and continuing where it left off automatically.

info

We have prefixed this as unstable_ as it's a new API, but you're safe to use it! Read more.

Setup

info

If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section.

To use httpSubscriptionLink, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.

client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
loggerLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @link https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
loggerLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @link https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});

Basic example

tip

For a full example, see our full-stack SSE example.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add', {
// Passing the AbortSignal from the request automatically cancels the event emitter when the request is aborted
signal: opts.signal,
})) {
const post = data as Post;
yield post;
}
}),
});

Automatic tracking of id using tracked() (recommended)

If you yield an event using our tracked()-helper and include an id, the browser will automatically reconnect when it gets disconnected and send the last known ID - this is part of the EventSource-spec and will be propagated through lastEventId in your .input().

You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.

tip

If you're fetching data based on the lastEventId, and capturing all events is critical, you may want to use ReadableStream's or a similar pattern as an intermediary as is done in our full-stack SSE example to prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add'), {
signal: opts.signal,
}) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { tracked } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z
.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
})
.optional(),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add'), {
signal: opts.signal,
}) {
const post = data as Post;
// tracking the post id ensures the client can reconnect at any time and get the latest events this id
yield tracked(post.id, post);
}
}),
});

Cleanup of side effects

If you need to clean up any side-effects of your subscription you can use the try...finally pattern, as trpc invokes the .return() of the Generator Instance when the subscription stops for any reason.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add'), {
signal: opts.signal,
}) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
let timeout;
try {
for await (const [data] of on(ee, 'add'), {
signal: opts.signal,
}) {
timeout = setTimeout(() => console.log('Pretend like this is useful'));
const post = data as Post;
yield post;
}
} finally {
if (timeout) clearTimeout(timeout);
}
}),
});

Error handling

Throwing an error in a generator function propagates to trpc's onError() on the backend, but the error will not be sent to the client - the client will automatically reconnect based on the last event id that is tracked using tracked().

Headers and authorization / authentication

Web apps

Same domain

If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.

Cross-domain

If the client and server are not on the same domain, you can use withCredentials: true (read more on MDN here).

Example:

tsx
// [...]
unstable_httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions: {
withCredentials: true, // <---
},
});
tsx
// [...]
unstable_httpSubscriptionLink({
url: 'https://example.com/api/trpc',
eventSourceOptions: {
withCredentials: true, // <---
},
});

Custom headers through polyfill

Recommended for non-web environments

You can polyfill EventSource and use the eventSourceOptions -callback to populate headers.

tsx
import {
createTRPCClient,
httpBatchLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// polyfill EventSource
globalThis.EventSource = EventSourcePolyfill;
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: 'http://localhost:3000',
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async () => {
return {
headers: {
authorization: 'Bearer supersecret',
},
}; // you either need to typecast to `EventSourceInit` or use `as any` or override the types by a `declare global` statement
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
tsx
import {
createTRPCClient,
httpBatchLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
import { EventSourcePolyfill } from 'event-source-polyfill';
import type { AppRouter } from '../server/index.js';
// polyfill EventSource
globalThis.EventSource = EventSourcePolyfill;
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: 'http://localhost:3000',
// options to pass to the EventSourcePolyfill constructor
eventSourceOptions: async () => {
return {
headers: {
authorization: 'Bearer supersecret',
},
}; // you either need to typecast to `EventSourceInit` or use `as any` or override the types by a `declare global` statement
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

Connection params

In order to authenticate with EventSource, you can define connectionParams in httpSubscriptionLink. This will be sent as part of the URL, which is why other methods are preferred).

server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';
 
export const createContext = async (opts: CreateHTTPContextOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});
client/trpc.ts
ts
import {
createTRPCClient,
httpBatchLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
import type { AppRouter } from '../server/index.js';
// Initialize the tRPC client
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: 'http://localhost:3000',
connectionParams: async () => {
// Will be serialized as part of the URL
return {
token: 'supersecret',
};
},
}),
false: httpBatchLink({
url: 'http://localhost:3000',
}),
}),
],
});

Compatibility (React Native)

The httpSubscriptionLink makes use of the EventSource API, Streams API, and AsyncIterators, these are not natively supported by React Native and will have to be polyfilled.

To polyfill EventSource we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that using the XMLHttpRequest API. Libraries that polyfill EventSource using XMLHttpRequest fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.

The Streams API can be polyfilled using the web-streams-polyfill package.

AsyncIterators can be polyfilled using the @azure/core-asynciterator-polyfill package.

Installation

Install the required polyfills:

npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill

Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):

utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
// RNEventSource extends EventSource's functionality, you can add this to make the typing reflect this but it's not a requirement
declare global {
interface EventSource extends RNEventSource {}
}
globalThis.EventSource = globalThis.EventSource || RNEventSource;
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsx
ts
import '@azure/core-asynciterator-polyfill';
import { RNEventSource } from 'rn-eventsource-reborn';
import { ReadableStream, TransformStream } from 'web-streams-polyfill';
// RNEventSource extends EventSource's functionality, you can add this to make the typing reflect this but it's not a requirement
declare global {
interface EventSource extends RNEventSource {}
}
globalThis.EventSource = globalThis.EventSource || RNEventSource;
globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;
globalThis.TransformStream = globalThis.TransformStream || TransformStream;

Once the polyfills are added, you can continue setting up the httpSubscriptionLink as described in the setup section.

ts
type MaybePromise<TValue> = TValue | Promise<TValue>;
type CallbackOrValue<TValue> = TValue | (() => MaybePromise<TValue>);
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: CallbackOrValue<string>;
/**
* EventSource options
*/
eventSourceOptions?: CallbackOrValue<EventSourceInit>;
/**
* Data transformer
* @link https://trpc.io/docs/v11/data-transformers
**/
transformer?: DataTransformerOptions;
};
ts
type MaybePromise<TValue> = TValue | Promise<TValue>;
type CallbackOrValue<TValue> = TValue | (() => MaybePromise<TValue>);
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: CallbackOrValue<string>;
/**
* EventSource options
*/
eventSourceOptions?: CallbackOrValue<EventSourceInit>;
/**
* Data transformer
* @link https://trpc.io/docs/v11/data-transformers
**/
transformer?: DataTransformerOptions;
};