Skip to content

Commit

Permalink
Fix failures in presence tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Dec 13, 2023
1 parent 7f756ee commit 0da6a70
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 107 deletions.
32 changes: 21 additions & 11 deletions src/client/attachment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class Attachment<T, P extends Indexable> {

watchStream?: WatchStream;
watchLoopTimerID?: ReturnType<typeof setTimeout>;
watchAbort: AbortController;
watchAbortController?: AbortController;

constructor(
reconnectStreamDelay: number,
Expand All @@ -34,7 +34,6 @@ export class Attachment<T, P extends Indexable> {
this.isRealtimeSync = isRealtimeSync;
this.syncMode = SyncMode.PushPull;
this.remoteChangeEventReceived = false;
this.watchAbort = new AbortController();
}

/**
Expand Down Expand Up @@ -76,7 +75,7 @@ export class Attachment<T, P extends Indexable> {
* `runWatchLoop` runs the watch loop.
*/
public async runWatchLoop(
watchStreamCreator: (onDisconnect: () => void, abort: AbortController) => Promise<WatchStream>,
watchStreamCreator: (onDisconnect: () => void) => Promise<WatchStream>,
): Promise<void> {
const doLoop = async (): Promise<void> => {
if (this.watchStream) {
Expand All @@ -87,12 +86,22 @@ export class Attachment<T, P extends Indexable> {
this.watchLoopTimerID = undefined;
}

const onDisconnect = () => {
this.watchStream = undefined;
this.watchLoopTimerID = setTimeout(doLoop, this.reconnectStreamDelay);
};

this.watchStream = await watchStreamCreator(onDisconnect, this.watchAbort);
try {
[this.watchStream, this.watchAbortController] =
await watchStreamCreator(() => {
this.watchStream = undefined;
this.watchAbortController = undefined;
this.watchLoopTimerID = setTimeout(
doLoop,
this.reconnectStreamDelay,
);
});
} catch (err) {
// TODO(hackerwins): For now, if the creation of the watch stream fails,
// it is considered normal and the watch loop is executed again after a
// certain period of time.
// In the future, we need to find a better way to handle this.
}
};

await doLoop();
Expand All @@ -102,9 +111,10 @@ export class Attachment<T, P extends Indexable> {
* `cancelWatchStream` cancels the watch stream.
*/
public cancelWatchStream(): void {
if (this.watchStream && this.watchAbort) {
this.watchAbort.abort();
if (this.watchStream && this.watchAbortController) {
this.watchAbortController.abort();
this.watchStream = undefined;
this.watchAbortController = undefined;
}
clearTimeout(this.watchLoopTimerID);
this.watchLoopTimerID = undefined;
Expand Down
89 changes: 42 additions & 47 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,24 +467,26 @@ export class Client implements Observable<ClientEvent> {
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then((res) => {
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() !== DocumentStatus.Removed) {
doc.setStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
isRealtimeSync,
),
);
if (doc.getStatus() === DocumentStatus.Removed) {
return doc;
}

if (isRealtimeSync) {
this.runWatchLoop(doc.getKey());
}
doc.setStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
isRealtimeSync,
),
);

if (isRealtimeSync) {
await this.runWatchLoop(doc.getKey());
}

logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
Expand Down Expand Up @@ -814,25 +816,22 @@ export class Client implements Observable<ClientEvent> {
}

return attachment.runWatchLoop(
(
onDisconnect: () => void,
abort: AbortController,
): Promise<WatchStream> => {
(onDisconnect: () => void): Promise<[WatchStream, AbortController]> => {
if (!this.isActive()) {
throw new YorkieError(
Code.ClientNotActive,
`${this.key} is not active`,
return Promise.reject(
new YorkieError(Code.ClientNotActive, `${this.key} is not active`),
);
}

const ac = new AbortController();
const stream = this.rpcClient.watchDocument(
{
clientId: this.id!,
documentId: attachment.docID,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${docKey}` },
signal: abort.signal,
signal: ac.signal,
},
);

Expand All @@ -842,33 +841,29 @@ export class Client implements Observable<ClientEvent> {
});
logger.info(`[WD] c:"${this.getKey()}" watches d:"${docKey}"`);

return new Promise((resolve, reject) => {
const handleStream = async () => {
try {
for await (const resp of stream) {
this.handleWatchDocumentsResponse(attachment, resp);

// TODO(hackerwins): When the first response is received, we need to
// resolve the promise to notify that the watch stream is ready.
if (resp.body.case === 'initialization') {
resolve(stream);
}
}
} catch (err) {
if (err instanceof ConnectError) {
this.eventStreamObserver.next({
type: ClientEventType.StreamConnectionStatusChanged,
value: StreamConnectionStatus.Disconnected,
});
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);
onDisconnect();
}
return new Promise(async (resolve, reject) => {
try {
for await (const resp of stream) {
this.handleWatchDocumentsResponse(attachment, resp);

reject(err);
// NOTE(hackerwins): When the first response is received, we need to
// resolve the promise to notify that the watch stream is ready.
if (resp.body.case === 'initialization') {
resolve([stream, ac]);
}
}
} catch (err) {
if (err?.constructor.name === 'ConnectError') {
this.eventStreamObserver.next({
type: ClientEventType.StreamConnectionStatusChanged,
value: StreamConnectionStatus.Disconnected,
});
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);
onDisconnect();
}
};

handleStream();
reject(err);
}
});
},
);
Expand Down
83 changes: 34 additions & 49 deletions test/integration/presence_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from '@yorkie-js-sdk/test/integration/integration_helper';
import { EventCollector, deepSort } from '@yorkie-js-sdk/test/helper/helper';

describe.skip('Presence', function () {
describe('Presence', function () {
afterEach(() => {
vi.restoreAllMocks();
});
Expand Down Expand Up @@ -114,43 +114,38 @@ describe.skip('Presence', function () {
const c2ID = c2.getID()!;

const docKey = toDocKey(`${task.name}-${new Date().getTime()}`);
const eventCollectorP1 = new EventCollector<DocEvent>();
const eventCollectorP2 = new EventCollector<DocEvent>();
type PresenceType = { name: string };
const doc1 = new yorkie.Document<{}, PresenceType>(docKey);
const events1 = new EventCollector<DocEvent>();
const events2 = new EventCollector<DocEvent>();

const doc1 = new yorkie.Document<{}, { name: string }>(docKey);
await c1.attach(doc1, { initialPresence: { name: 'a' } });
const stub1 = vi.fn().mockImplementation((event) => {
eventCollectorP1.add(event);
});
const unsub1 = doc1.subscribe('presence', stub1);
const unsub1 = doc1.subscribe('presence', (event) => events1.add(event));

const doc2 = new yorkie.Document<{}, PresenceType>(docKey);
const doc2 = new yorkie.Document<{}, { name: string }>(docKey);
await c2.attach(doc2, { initialPresence: { name: 'b' } });
const stub2 = vi.fn().mockImplementation((event) => {
eventCollectorP2.add(event);
});
const unsub2 = doc2.subscribe('presence', stub2);
await eventCollectorP1.waitAndVerifyNthEvent(1, {
const unsub2 = doc2.subscribe('presence', (event) => events2.add(event));

await events1.waitAndVerifyNthEvent(1, {
type: DocEventType.Watched,
value: { clientID: c2ID, presence: { name: 'b' } },
});

doc1.update((root, p) => p.set({ name: 'A' }));
doc2.update((root, p) => p.set({ name: 'B' }));
doc1.update((r, p) => p.set({ name: 'A' }));
doc2.update((r, p) => p.set({ name: 'B' }));

await eventCollectorP1.waitAndVerifyNthEvent(2, {
await events1.waitAndVerifyNthEvent(2, {
type: DocEventType.PresenceChanged,
value: { clientID: c1ID, presence: { name: 'A' } },
});
await eventCollectorP1.waitAndVerifyNthEvent(3, {
await events1.waitAndVerifyNthEvent(3, {
type: DocEventType.PresenceChanged,
value: { clientID: c2ID, presence: { name: 'B' } },
});
await eventCollectorP2.waitAndVerifyNthEvent(1, {
await events2.waitAndVerifyNthEvent(1, {
type: DocEventType.PresenceChanged,
value: { clientID: c2ID, presence: { name: 'B' } },
});
await eventCollectorP2.waitAndVerifyNthEvent(2, {
await events2.waitAndVerifyNthEvent(2, {
type: DocEventType.PresenceChanged,
value: { clientID: c1ID, presence: { name: 'A' } },
});
Expand Down Expand Up @@ -313,7 +308,7 @@ describe.skip('Presence', function () {
});
});

describe.skip(`Document.Subscribe('presence')`, function () {
describe(`Document.Subscribe('presence')`, function () {
it(`Should receive presence-changed event for final presence if there are multiple presence changes within doc.update`, async function ({
task,
}) {
Expand Down Expand Up @@ -423,6 +418,9 @@ describe.skip(`Document.Subscribe('presence')`, function () {
it(`Can receive presence-related event only when using realtime sync`, async function ({
task,
}) {
type PresenceType = { name: string; cursor: { x: number; y: number } };
const docKey = toDocKey(`${task.name}-${new Date().getTime()}`);

const c1 = new yorkie.Client(testRPCAddr);
const c2 = new yorkie.Client(testRPCAddr);
const c3 = new yorkie.Client(testRPCAddr);
Expand All @@ -432,17 +430,12 @@ describe.skip(`Document.Subscribe('presence')`, function () {
const c2ID = c2.getID()!;
const c3ID = c3.getID()!;

const docKey = toDocKey(`${task.name}-${new Date().getTime()}`);
type PresenceType = { name: string; cursor: { x: number; y: number } };
const doc1 = new yorkie.Document<{}, PresenceType>(docKey);
await c1.attach(doc1, {
initialPresence: { name: 'a1', cursor: { x: 0, y: 0 } },
});
const eventCollector = new EventCollector<DocEvent>();
const stub = vi.fn().mockImplementation((event) => {
eventCollector.add(event);
});
const unsub = doc1.subscribe('presence', stub);
const events = new EventCollector<DocEvent>();
const unsub = doc1.subscribe('presence', (event) => events.add(event));

// 01. c2 attaches doc in realtime sync, and c3 attached doc in manual sync.
// c1 receives the watched event from c2.
Expand All @@ -455,7 +448,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {
initialPresence: { name: 'c1', cursor: { x: 0, y: 0 } },
isRealtimeSync: false,
});
await eventCollector.waitAndVerifyNthEvent(1, {
await events.waitAndVerifyNthEvent(1, {
type: DocEventType.Watched,
value: {
clientID: c2ID,
Expand All @@ -465,13 +458,9 @@ describe.skip(`Document.Subscribe('presence')`, function () {

// 02. c2 and c3 update the presence.
// c1 receives the presence-changed event from c2.
doc2.update((_, presence) => {
presence.set({ name: 'b2' });
});
doc3.update((_, presence) => {
presence.set({ name: 'c2' });
});
await eventCollector.waitAndVerifyNthEvent(2, {
doc2.update((_, p) => p.set({ name: 'b2' }));
doc3.update((_, p) => p.set({ name: 'c2' }));
await events.waitAndVerifyNthEvent(2, {
type: DocEventType.PresenceChanged,
value: {
clientID: c2ID,
Expand All @@ -481,7 +470,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {

// 03-1. c2 pauses the document, c1 receives an unwatched event from c2.
await c2.pause(doc2);
await eventCollector.waitAndVerifyNthEvent(3, {
await events.waitAndVerifyNthEvent(3, {
type: DocEventType.Unwatched,
value: {
clientID: c2ID,
Expand All @@ -496,7 +485,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {
await c3.sync();
await c1.sync();
await c3.resume(doc3);
await eventCollector.waitAndVerifyNthEvent(4, {
await events.waitAndVerifyNthEvent(4, {
type: DocEventType.Watched,
value: {
clientID: c3ID,
Expand All @@ -506,13 +495,9 @@ describe.skip(`Document.Subscribe('presence')`, function () {

// 04. c2 and c3 update the presence.
// c1 receives the presence-changed event from c3.
doc2.update((_, presence) => {
presence.set({ name: 'b3' });
});
doc3.update((_, presence) => {
presence.set({ name: 'c3' });
});
await eventCollector.waitAndVerifyNthEvent(5, {
doc2.update((_, p) => p.set({ name: 'b3' }));
doc3.update((_, p) => p.set({ name: 'c3' }));
await events.waitAndVerifyNthEvent(5, {
type: DocEventType.PresenceChanged,
value: {
clientID: c3ID,
Expand All @@ -522,7 +507,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {

// 05-1. c3 pauses the document, c1 receives an unwatched event from c3.
await c3.pause(doc3);
await eventCollector.waitAndVerifyNthEvent(6, {
await events.waitAndVerifyNthEvent(6, {
type: DocEventType.Unwatched,
value: {
clientID: c3ID,
Expand All @@ -534,7 +519,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {
await c2.sync();
await c1.sync();
await c2.resume(doc2);
await eventCollector.waitAndVerifyNthEvent(7, {
await events.waitAndVerifyNthEvent(7, {
type: DocEventType.Watched,
value: {
clientID: c2ID,
Expand All @@ -549,7 +534,7 @@ describe.skip(`Document.Subscribe('presence')`, function () {
});
});

describe.skip('Undo/Redo', function () {
describe('Undo/Redo', function () {
it('Can undo/redo with presence', async function ({ task }) {
type TestDoc = { counter: Counter };
type Presence = { color: string };
Expand Down

0 comments on commit 0da6a70

Please sign in to comment.