HTTP Subscription Link
httpSubscriptionLink
is a terminating link that's uses Server-sent Events (SSE) for subscriptions.
SSE is a good option for real-time as it's a bit easier to deal with than WebSockets and handles things like reconnecting and continuing where it left off automatically.
We have prefixed this as unstable_
as it's a new API, but you're safe to use it! Read more.
Setup
If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section.
To use httpSubscriptionLink
, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,loggerLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @link https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,loggerLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @link https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
Basic example
For a full example, see our full-stack SSE example.
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {// listen for new eventsfor await (const [data] of on(ee, 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal: opts.signal,})) {const post = data as Post;yield post;}}),});
Automatic tracking of id using tracked()
(recommended)
If you yield
an event using our tracked()
-helper and include an id
, the browser will automatically reconnect when it gets disconnected and send the last known ID - this is part of the EventSource
-spec and will be propagated through lastEventId
in your .input()
.
You can send an initial lastEventId
when initializing the subscription and it will be automatically updated as the browser receives data.
If you're fetching data based on the lastEventId
, and capturing all events is critical, you may want to use ReadableStream
's or a similar pattern as an intermediary as is done in our full-stack SSE example to prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId
.
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data] of on(ee, 'add'), {signal: opts.signal,}) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { tracked } from '@trpc/server';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.input(z.object({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId: z.string().nullish(),}).optional(),).subscription(async function* (opts) {if (opts.input.lastEventId) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data] of on(ee, 'add'), {signal: opts.signal,}) {const post = data as Post;// tracking the post id ensures the client can reconnect at any time and get the latest events this idyield tracked(post.id, post);}}),});
Cleanup of side effects
If you need to clean up any side-effects of your subscription you can use the try...finally
pattern, as trpc
invokes the .return()
of the Generator Instance when the subscription stops for any reason.
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add'), {signal: opts.signal,}) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
ts
import EventEmitter, { on } from 'events';import type { Post } from '@prisma/client';import { z } from 'zod';import { publicProcedure, router } from '../trpc';const ee = new EventEmitter();export const subRouter = router({onPostAdd: publicProcedure.subscription(async function* (opts) {let timeout;try {for await (const [data] of on(ee, 'add'), {signal: opts.signal,}) {timeout = setTimeout(() => console.log('Pretend like this is useful'));const post = data as Post;yield post;}} finally {if (timeout) clearTimeout(timeout);}}),});
Error handling
Throwing an error in a generator function propagates to trpc
's onError()
on the backend, but the error will not be sent to the client - the client will automatically reconnect based on the last event id that is tracked using tracked()
.
Headers and authorization / authentication
Web apps
Same domain
If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.
Cross-domain
If the client and server are not on the same domain, you can use withCredentials: true
(read more on MDN here).
Example:
tsx
// [...]unstable_httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions: {withCredentials: true, // <---},});
tsx
// [...]unstable_httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions: {withCredentials: true, // <---},});
Custom headers through polyfill
Recommended for non-web environments
You can polyfill EventSource
and use the eventSourceOptions
-callback to populate headers.
tsx
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// polyfill EventSourceglobalThis.EventSource = EventSourcePolyfill;// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async () => {return {headers: {authorization: 'Bearer supersecret',},}; // you either need to typecast to `EventSourceInit` or use `as any` or override the types by a `declare global` statement},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
tsx
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// polyfill EventSourceglobalThis.EventSource = EventSourcePolyfill;// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async () => {return {headers: {authorization: 'Bearer supersecret',},}; // you either need to typecast to `EventSourceInit` or use `as any` or override the types by a `declare global` statement},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
Connection params
In order to authenticate with EventSource
, you can define connectionParams
in httpSubscriptionLink
. This will be sent as part of the URL, which is why other methods are preferred).
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tsts
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
client/trpc.tsts
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
Compatibility (React Native)
The httpSubscriptionLink
makes use of the EventSource
API, Streams API, and AsyncIterator
s, these are not natively supported by React Native and will have to be polyfilled.
To polyfill EventSource
we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that using the XMLHttpRequest
API. Libraries that polyfill EventSource
using XMLHttpRequest
fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.
The Streams API can be polyfilled using the web-streams-polyfill package.
AsyncIterator
s can be polyfilled using the @azure/core-asynciterator-polyfill package.
Installation
Install the required polyfills:
- npm
- yarn
- pnpm
- bun
npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
yarn add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
pnpm add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
bun add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';// RNEventSource extends EventSource's functionality, you can add this to make the typing reflect this but it's not a requirementdeclare global {interface EventSource extends RNEventSource {}}globalThis.EventSource = globalThis.EventSource || RNEventSource;globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';// RNEventSource extends EventSource's functionality, you can add this to make the typing reflect this but it's not a requirementdeclare global {interface EventSource extends RNEventSource {}}globalThis.EventSource = globalThis.EventSource || RNEventSource;globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
Once the polyfills are added, you can continue setting up the httpSubscriptionLink
as described in the setup section.
httpSubscriptionLink
Options
ts
type MaybePromise<TValue> = TValue | Promise<TValue>;type CallbackOrValue<TValue> = TValue | (() => MaybePromise<TValue>);type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {/*** The URL to connect to (can be a function that returns a URL)*/url: CallbackOrValue<string>;/*** EventSource options*/eventSourceOptions?: CallbackOrValue<EventSourceInit>;/*** Data transformer* @link https://trpc.io/docs/v11/data-transformers**/transformer?: DataTransformerOptions;};
ts
type MaybePromise<TValue> = TValue | Promise<TValue>;type CallbackOrValue<TValue> = TValue | (() => MaybePromise<TValue>);type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {/*** The URL to connect to (can be a function that returns a URL)*/url: CallbackOrValue<string>;/*** EventSource options*/eventSourceOptions?: CallbackOrValue<EventSourceInit>;/*** Data transformer* @link https://trpc.io/docs/v11/data-transformers**/transformer?: DataTransformerOptions;};