Skip to content

Commit

Permalink
refactor: unify sse endpoints handling (#950)
Browse files Browse the repository at this point in the history
* refactor: unify sse endpoints handling

this refactors our current EventSource/SSE handling and unifies it under a common implementation shared between the live content api and the listener api.

* feat: export connectEventSource

* fix(tests): update listener tests to reflect new handling of disconnect messages

* fix: add OpenEvent to ListenEvent union
  • Loading branch information
bjoerge authored Jan 13, 2025
1 parent dac1f7c commit ddfd244
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 270 deletions.
255 changes: 255 additions & 0 deletions src/data/eventsource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import {defer, isObservable, mergeMap, Observable, of} from 'rxjs'

import {type Any} from '../types'

/**
* @public
* Thrown if the EventSource connection could not be established.
* Note that ConnectionFailedErrors are rare, and disconnects will normally be handled by the EventSource instance itself and emitted as `reconnect` events.
*/
export class ConnectionFailedError extends Error {
readonly name = 'ConnectionFailedError'
}

/**
* The listener has been told to explicitly disconnect.
* This is a rare situation, but may occur if the API knows reconnect attempts will fail,
* eg in the case of a deleted dataset, a blocked project or similar events.
* @public
*/
export class DisconnectError extends Error {
readonly name = 'DisconnectError'
readonly reason?: string
constructor(message: string, reason?: string, options: ErrorOptions = {}) {
super(message, options)
this.reason = reason
}
}

/**
* @public
* The server sent a `channelError` message. Usually indicative of a bad or malformed request
*/
export class ChannelError extends Error {
readonly name = 'ChannelError'
readonly data?: unknown
constructor(message: string, data: unknown) {
super(message)
this.data = data
}
}

/**
* @public
* The server sent an `error`-event to tell the client that an unexpected error has happened.
*/
export class MessageError extends Error {
readonly name = 'MessageError'
readonly data?: unknown
constructor(message: string, data: unknown, options: ErrorOptions = {}) {
super(message, options)
this.data = data
}
}

/**
* @public
* An error occurred while parsing the message sent by the server as JSON. Should normally not happen.
*/
export class MessageParseError extends Error {
readonly name = 'MessageParseError'
}

/**
* @public
*/
export interface ServerSentEvent<Name extends string> {
type: Name
id?: string
data?: unknown
}

// Always listen for these events, no matter what
const REQUIRED_EVENTS = ['channelError', 'disconnect']

/**
* @internal
*/
export type EventSourceEvent<Name extends string> = ServerSentEvent<Name>

/**
* @internal
*/
export type EventSourceInstance = InstanceType<typeof globalThis.EventSource>

/**
* Sanity API specific EventSource handler shared between the listen and live APIs
*
* Since the `EventSource` API is not provided by all environments, this function enables custom initialization of the EventSource instance
* for runtimes that requires polyfilling or custom setup logic (e.g. custom HTTP headers)
* via the passed `initEventSource` function which must return an EventSource instance.
*
* Possible errors to be thrown on the returned observable are:
* - {@link MessageError}
* - {@link MessageParseError}
* - {@link ChannelError}
* - {@link DisconnectError}
* - {@link ConnectionFailedError}
*
* @param initEventSource - A function that returns an EventSource instance or an Observable that resolves to an EventSource instance
* @param events - an array of named events from the API to listen for.
*
* @internal
*/
export function connectEventSource<EventName extends string>(
initEventSource: () => EventSourceInstance | Observable<EventSourceInstance>,
events: EventName[],
) {
return defer(() => {
const es = initEventSource()
return isObservable(es) ? es : of(es)
}).pipe(mergeMap((es) => connectWithESInstance(es, events))) as Observable<
ServerSentEvent<EventName>
>
}

/**
* Provides an observable from the passed EventSource instance, subscribing to the passed list of names of events types to listen for
* Handles connection logic, adding/removing event listeners, payload parsing, error propagation, etc.
*
* @param es - The EventSource instance
* @param events - List of event names to listen for
*/
function connectWithESInstance<EventTypeName extends string>(
es: EventSourceInstance,
events: EventTypeName[],
) {
return new Observable<EventSourceEvent<EventTypeName>>((observer) => {
const emitOpen = (events as string[]).includes('open')
const emitReconnect = (events as string[]).includes('reconnect')

// EventSource will emit a regular Event if it fails to connect, however the API may also emit an `error` MessageEvent
// So we need to handle both cases
function onError(evt: MessageEvent | Event) {
// If the event has a `data` property, then it`s a MessageEvent emitted by the API and we should forward the error
if ('data' in evt) {
const [parseError, event] = parseEvent(evt as MessageEvent)
observer.error(
parseError
? new MessageParseError('Unable to parse EventSource error message', {cause: event})
: new MessageError((event?.data as {message: string}).message, event),
)
return
}

// We should never be in a disconnected state. By default, EventSource will reconnect
// automatically, but in some cases (like when a laptop lid is closed), it will trigger onError
// if it can't reconnect.
// see https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
if (es.readyState === es.CLOSED) {
// In these cases we'll signal to consumers (via the error path) that a retry/reconnect is needed.
observer.error(new ConnectionFailedError('EventSource connection failed'))
} else if (emitReconnect) {
observer.next({type: 'reconnect' as EventTypeName})
}
}

function onOpen() {
// The open event of the EventSource API is fired when a connection with an event source is opened.
observer.next({type: 'open' as EventTypeName})
}

function onMessage(message: MessageEvent) {
const [parseError, event] = parseEvent(message)
if (parseError) {
observer.error(
new MessageParseError('Unable to parse EventSource message', {cause: parseError}),
)
return
}
if (message.type === 'channelError') {
// An error occurred. This is different from a network-level error (which will be emitted as 'error').
// Possible causes are things such as malformed filters, non-existant datasets or similar.
observer.error(new ChannelError(extractErrorMessage(event?.data), event.data))
return
}
if (message.type === 'disconnect') {
// The listener has been told to explicitly disconnect and not reconnect.
// This is a rare situation, but may occur if the API knows reconnect attempts will fail,
// eg in the case of a deleted dataset, a blocked project or similar events.
observer.error(
new DisconnectError(
`Server disconnected client: ${
(event.data as {reason?: string})?.reason || 'unknown error'
}`,
),
)
return
}
observer.next({
type: message.type as EventTypeName,
id: message.lastEventId,
...(event.data ? {data: event.data} : {}),
})
}

es.addEventListener('error', onError)

if (emitOpen) {
es.addEventListener('open', onOpen)
}

// Make sure we have a unique list of events types to avoid listening multiple times,
const cleanedEvents = [...new Set([...REQUIRED_EVENTS, ...events])]
// filter out events that are handled separately
.filter((type) => type !== 'error' && type !== 'open' && type !== 'reconnect')

cleanedEvents.forEach((type: string) => es.addEventListener(type, onMessage))

return () => {
es.removeEventListener('error', onError)
if (emitOpen) {
es.removeEventListener('open', onOpen)
}
cleanedEvents.forEach((type: string) => es.removeEventListener(type, onMessage))
es.close()
}
})
}

function parseEvent(
message: MessageEvent,
): [null, {type: string; id: string; data?: unknown}] | [Error, null] {
try {
const data = typeof message.data === 'string' && JSON.parse(message.data)
return [
null,
{
type: message.type,
id: message.lastEventId,
...(isEmptyObject(data) ? {} : {data}),
},
]
} catch (err) {
return [err as Error, null]
}
}

function extractErrorMessage(err: Any) {
if (!err.error) {
return err.message || 'Unknown listener error'
}

if (err.error.description) {
return err.error.description
}

return typeof err.error === 'string' ? err.error : JSON.stringify(err.error, null, 2)
}

function isEmptyObject(data: object) {
for (const _ in data) {
return false
}
return true
}
7 changes: 7 additions & 0 deletions src/data/eventsourcePolyfill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {defer, shareReplay} from 'rxjs'
import {map} from 'rxjs/operators'

export const eventSourcePolyfill = defer(() => import('@sanity/eventsource')).pipe(
map(({default: EventSource}) => EventSource as unknown as typeof globalThis.EventSource),
shareReplay(1),
)
Loading

0 comments on commit ddfd244

Please sign in to comment.