Skip to content

Commit

Permalink
[infrastructure] Improve how we handle connection failures/reconnects (
Browse files Browse the repository at this point in the history
…#632)

* [document-store] Pass through reconnect events

* [desk-tool] Set form in readOnly and show warning when reconnecting

* [form-builder] Support passing readOnly to top level FormBuilder component

* [components] Using portal beacuse css translate in a parent will destroy fixed positino

* [preview] Map all welcome events to refetch/sync

* [preview] Keep global listener alive + minor cleanup
  • Loading branch information
bjoerge authored Mar 5, 2018
1 parent 2d4a1f0 commit 582a5ee
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 34 deletions.
4 changes: 2 additions & 2 deletions packages/@sanity/base/src/datastores/document/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function fetchQuerySnapshot(query, params) {

const serverConnection = {
byId(id) {
return Observable.from(client.listen('*[_id == $id]', {id: id}, {includeResult: false, events: ['welcome', 'mutation']}))
return Observable.from(client.listen('*[_id == $id]', {id: id}, {includeResult: false, events: ['welcome', 'mutation', 'reconnect']}))
.concatMap(event => {
return (event.type === 'welcome')
? Observable.from(fetchDocumentSnapshot(id))
Expand All @@ -33,7 +33,7 @@ const serverConnection = {
},

query(query, params) {
return Observable.from(client.observable.listen(query, params || {}, {includeResult: false, events: ['welcome', 'mutation']}))
return Observable.from(client.observable.listen(query, params || {}, {includeResult: false, events: ['welcome', 'mutation', 'reconnect']}))
.concatMap(event => {
return (event.type === 'welcome')
? Observable.from(fetchQuerySnapshot(query, params))
Expand Down
21 changes: 12 additions & 9 deletions packages/@sanity/components/src/snackbar/DefaultSnackbar.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import PropTypes from 'prop-types'
import React from 'react'
import styles from 'part:@sanity/components/snackbar/default-style'
import Button from 'part:@sanity/components/buttons/default'
import Portal from 'react-portal'

export default class DefaultSnackbar extends React.PureComponent {
static propTypes = {
Expand Down Expand Up @@ -85,18 +86,20 @@ export default class DefaultSnackbar extends React.PureComponent {
const style = `${styles[kind] || styles.root} ${this.state.visible ? styles.visible : styles.hidden}`

return (
<div className={style}>
<div className={styles.inner} onMouseOver={this.handleMouseOver} onMouseLeave={this.handleMouseLeave}>
{action && (
<div className={styles.action}>
<Button inverted color="white" onClick={this.handleAction}>{action.title}</Button>
<Portal isOpened>
<div className={style}>
<div className={styles.inner} onMouseOver={this.handleMouseOver} onMouseLeave={this.handleMouseLeave}>
{action && (
<div className={styles.action}>
<Button inverted color="white" onClick={this.handleAction}>{action.title}</Button>
</div>
)}
<div className={styles.content}>
{children}
</div>
)}
<div className={styles.content}>
{children}
</div>
</div>
</div>
</Portal>
)
}
}
36 changes: 30 additions & 6 deletions packages/@sanity/desk-tool/src/pane/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export default withRouterHOC(
isCreatingDraft: PropTypes.bool,
isUnpublishing: PropTypes.bool,
isPublishing: PropTypes.bool,
isReconnecting: PropTypes.bool,
isLoading: PropTypes.bool,
isSaving: PropTypes.bool,
deletedSnapshot: PropTypes.object
Expand All @@ -194,6 +195,7 @@ export default withRouterHOC(
isSaving: false,
isUnpublishing: false,
isPublishing: false,
isReconnecting: false,
isCreatingDraft: false,
deletedSnapshot: null,
transactionResult: null,
Expand Down Expand Up @@ -429,7 +431,7 @@ export default withRouterHOC(
}

renderFunctions = () => {
const {draft, published, markers, type} = this.props
const {draft, published, markers, type, isReconnecting} = this.props
const {showSavingStatus, showValidationTooltip} = this.state

const value = draft || published
Expand Down Expand Up @@ -457,8 +459,25 @@ export default withRouterHOC(
Syncing…
</Tooltip>
)}
{isReconnecting && (
<Tooltip
className={styles.syncStatusSyncing}
arrow
theme="light"
size="small"
distance="0"
title="Connection lost. Reconnecting…"
>
<span className={styles.spinnerContainer}>
<span className={styles.spinner}>
<SyncIcon />
</span>
</span>{' '}
Reconnecting…
</Tooltip>
)}
{value &&
!showSavingStatus && (
!showSavingStatus && !isReconnecting && (
<Tooltip
className={styles.syncStatusSynced}
arrow
Expand Down Expand Up @@ -509,7 +528,7 @@ export default withRouterHOC(
title={errors.length > 0 ? 'Fix errors before publishing' : 'Ctrl+Alt+P'}
>
<Button
disabled={!draft || errors.length > 0}
disabled={isReconnecting || !draft || errors.length > 0}
onClick={this.handlePublishRequested}
color="primary"
>
Expand Down Expand Up @@ -544,6 +563,7 @@ export default withRouterHOC(
isPublishing,
isUnpublishing,
isCreatingDraft,
isReconnecting,
patchChannel,
transactionResult,
onClearTransactionResult
Expand Down Expand Up @@ -621,14 +641,14 @@ export default withRouterHOC(
patchChannel={patchChannel}
value={draft || published || {_type: type.name}}
type={type}
readOnly={isReconnecting}
onBlur={this.handleBlur}
onFocus={this.handleFocus}
focusPath={focusPath}
onChange={this.handleChange}
markers={markers}
/>
</form>

{afterEditorComponents.map((AfterEditorComponent, i) => (
<AfterEditorComponent key={i} documentId={published._id} />
))}
Expand Down Expand Up @@ -666,11 +686,15 @@ export default withRouterHOC(
onConfirm={this.handleConfirmUnpublish}
/>
)}

{isReconnecting && (
<Snackbar kind="warning">
<WarningIcon /> Connection lost. Reconnecting…
</Snackbar>
)}
{transactionResult &&
transactionResult.type === 'error' && (
<Snackbar
kind={'danger'}
kind="danger"
action={{title: 'Ok, got it'}}
onAction={onClearTransactionResult}
>
Expand Down
9 changes: 8 additions & 1 deletion packages/@sanity/desk-tool/src/pane/EditorWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const INITIAL_DOCUMENT_STATE = {

const INITIAL_STATE = {
isSaving: true,
isReconnecting: false,
isCreatingDraft: false,
transactionResult: null,
validationPending: true,
Expand Down Expand Up @@ -53,6 +54,9 @@ function documentEventToState(event) {
: event.document
}
}
case 'reconnect': {
return {}
}
default: {
// eslint-disable-next-line no-console
console.log('Unhandled document event type "%s"', event.type, event)
Expand Down Expand Up @@ -124,6 +128,7 @@ export default class EditorWrapper extends React.Component {
}

receiveDraftEvent = event => {
this.setState({isReconnecting: event.type === 'reconnect'})
if (event.type !== 'mutation') {
return
}
Expand Down Expand Up @@ -393,7 +398,8 @@ export default class EditorWrapper extends React.Component {
transactionResult,
isPublishing,
isSaving,
validationPending
validationPending,
isReconnecting
} = this.state

if (isRecoverable(draft, published)) {
Expand All @@ -410,6 +416,7 @@ export default class EditorWrapper extends React.Component {
validationPending={validationPending}
isLoading={draft.isLoading || published.isLoading}
isSaving={isSaving}
isReconnecting={isReconnecting}
isPublishing={isPublishing}
isUnpublishing={isUnpublishing}
transactionResult={transactionResult}
Expand Down
4 changes: 3 additions & 1 deletion packages/@sanity/document-store/src/createDocumentStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const NOOP = () => {}
function createBufferedDocument(documentId, server) {

const serverEvents$ = Observable.from(server.byId(documentId)).share()
const reconnects$ = serverEvents$.filter(event => event.type === 'reconnect')
const saves = pubsub()

const bufferedDocs$ = serverEvents$
Expand Down Expand Up @@ -70,6 +71,7 @@ function createBufferedDocument(documentId, server) {
observer.next({type: 'snapshot', document: bufferedDocument.LOCAL})
return mutation$
.merge(rebase$)
.merge(reconnects$)
.subscribe(observer)
}),
patch(patches) {
Expand Down Expand Up @@ -143,7 +145,7 @@ function createBufferedDocument(documentId, server) {
}
}

module.exports = function createDocumentStore({serverConnection}) {
export default function createDocumentStore({serverConnection}) {

return {
byId,
Expand Down
3 changes: 3 additions & 0 deletions packages/@sanity/form-builder/src/sanity/SanityFormBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Props = {
markers: Array<Marker>,
patchChannel: PatchChannel,
onFocus: Path => void,
readOnly: boolean,
onChange: () => {},
onBlur: () => void,
autoFocus: boolean,
Expand Down Expand Up @@ -43,6 +44,7 @@ export default class SanityFormBuilder extends React.Component<Props> {
patchChannel,
type,
onChange,
readOnly,
markers,
onFocus,
onBlur,
Expand All @@ -61,6 +63,7 @@ export default class SanityFormBuilder extends React.Component<Props> {
markers={markers}
focusPath={focusPath}
isRoot
readOnly={readOnly}
ref={this.setInput}
/>
</SanityFormBuilderContext>
Expand Down
54 changes: 39 additions & 15 deletions packages/@sanity/preview/src/observeFields.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,40 @@ import type {FieldName, Id} from './types'
import {INCLUDE_FIELDS} from './constants'

let _globalListener
const getGlobalListener = () => {
const getGlobalEvents = () => {
if (!_globalListener) {
_globalListener = Observable.from(
client.listen('*[!(_id in path("_.**"))]', {}, {includeResult: false})
const allEvents$ = Observable.from(
client.listen(
'*[!(_id in path("_.**"))]',
{},
{events: ['welcome', 'mutation'], includeResult: false}
)
).share()

// This will keep the listener active forever and in turn reduce the number of initial fetches
// as less 'welcome' events will be emitted.
// @todo: see if we can delay unsubscribing or connect with some globally defined shared listener
allEvents$.subscribe()

_globalListener = {
// This is a stream of welcome events from the server, each telling us that we have established listener connection
// We map these to snapshot fetch/sync. It is good to wait for the first welcome event before fetching any snapshots as, we may miss
// events that happens in the time period after initial fetch and before the listener is established.
welcome$: allEvents$
.filter(event => event.type === 'welcome')
.publishReplay(1)
.refCount(),
mutations$: allEvents$.filter(event => event.type === 'mutation')
}
}
return _globalListener
}

function listen(id: Id) {
return getGlobalListener().filter(event => event.documentId === id)
const globalEvents = getGlobalEvents()
return globalEvents.welcome$.merge(
globalEvents.mutations$.filter(event => event.documentId === id)
)
}

function fetchAllDocumentPaths(selections: Selection[]) {
Expand All @@ -32,8 +55,13 @@ const fetchDocumentPathsFast = debounceCollect(fetchAllDocumentPaths, 100)
const fetchDocumentPathsSlow = debounceCollect(fetchAllDocumentPaths, 1000)

function listenFields(id: Id, fields: FieldName[]) {
// console.log('listening on doc #%s for fields %O', id, fields)
return fetchDocumentPathsFast(id, fields)
return listen(id)
.switchMap(
event =>
event.type === 'welcome'
? fetchDocumentPathsFast(id, fields)
: fetchDocumentPathsSlow(id, fields)
)
.mergeMap(
result =>
result === undefined
Expand All @@ -42,7 +70,6 @@ function listenFields(id: Id, fields: FieldName[]) {
fetchDocumentPathsSlow(id, fields)
: Observable.of(result)
)
.concat(listen(id).switchMap(event => fetchDocumentPathsSlow(id, fields)))
}

// keep for debugging purposes for now
Expand All @@ -63,18 +90,15 @@ const CACHE: Cache = {} // todo: use a LRU cache instead (e.g. hashlru or quick-
function createCachedFieldObserver(id, fields): CachedFieldObserver {
let latest = null
const changes$ = new Observable(observer => {
if (latest) {
// Re-emit last known value immediately
observer.next(latest)
return fetchDocumentPathsSlow(id, fields)
.concat(listen(id).switchMap(event => fetchDocumentPathsSlow(id, fields)))
.subscribe(observer)
}
return listenFields(id, fields).subscribe(observer)
observer.next(latest)
observer.complete()
})
.filter(Boolean)
.merge(listenFields(id, fields))
.do(v => (latest = v))
.publishReplay()
.refCount()

return {id, fields, changes$}
}

Expand Down

0 comments on commit 582a5ee

Please sign in to comment.