Skip to content

Commit 043987f

Browse files
authored
fix: fix event flush process (#673)
1 parent e8216bc commit 043987f

File tree

2 files changed

+71
-17
lines changed

2 files changed

+71
-17
lines changed

packages/analytics-core/src/plugins/destination.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ export class Destination implements DestinationPlugin {
5959

6060
this.storageKey = `${STORAGE_PREFIX}_${this.config.apiKey.substring(0, 10)}`;
6161
const unsent = await this.config.storageProvider?.get(this.storageKey);
62-
this.saveEvents(); // sets storage to '[]'
6362
if (unsent && unsent.length > 0) {
6463
void Promise.all(unsent.map((event) => this.execute(event))).catch();
6564
}
@@ -83,6 +82,7 @@ export class Destination implements DestinationPlugin {
8382
const tryable = list.filter((context) => {
8483
if (context.attempts < this.config.flushMaxRetries) {
8584
context.attempts += 1;
85+
8686
return true;
8787
}
8888
void this.fulfillRequest([context], 500, MAX_RETRIES_EXCEEDED_MESSAGE);
@@ -102,7 +102,7 @@ export class Destination implements DestinationPlugin {
102102
}, context.timeout);
103103
});
104104

105-
this.saveEvents();
105+
void this.updateEventStorage([], this.queue);
106106
}
107107

108108
schedule(timeout: number) {
@@ -186,19 +186,19 @@ export class Destination implements DestinationPlugin {
186186

187187
switch (status) {
188188
case Status.Success: {
189-
this.handleSuccessResponse(res, list);
189+
void this.handleSuccessResponse(res, list);
190190
break;
191191
}
192192
case Status.Invalid: {
193-
this.handleInvalidResponse(res, list);
193+
void this.handleInvalidResponse(res, list);
194194
break;
195195
}
196196
case Status.PayloadTooLarge: {
197-
this.handlePayloadTooLargeResponse(res, list);
197+
void this.handlePayloadTooLargeResponse(res, list);
198198
break;
199199
}
200200
case Status.RateLimit: {
201-
this.handleRateLimitResponse(res, list);
201+
void this.handleRateLimitResponse(res, list);
202202
break;
203203
}
204204
default: {
@@ -241,6 +241,7 @@ export class Destination implements DestinationPlugin {
241241
// log intermediate event status before retry
242242
this.config.loggerProvider.warn(getResponseBodyString(res));
243243
}
244+
244245
this.addToQueue(...retry);
245246
}
246247

@@ -297,21 +298,37 @@ export class Destination implements DestinationPlugin {
297298
}
298299

299300
fulfillRequest(list: Context[], code: number, message: string) {
300-
this.saveEvents();
301301
list.forEach((context) => context.callback(buildResult(context.event, code, message)));
302+
void this.updateEventStorage(list);
302303
}
303304

304305
/**
305-
* Saves events to storage
306306
* This is called on
307307
* 1) new events are added to queue; or
308308
* 2) response comes back for a request
309+
*
310+
* update the event storage
309311
*/
310-
saveEvents() {
312+
async updateEventStorage(eventsToRemove: Context[], eventsToAdd?: Context[]) {
311313
if (!this.config.storageProvider) {
312314
return;
313315
}
314-
const events = Array.from(this.queue.map((context) => context.event));
315-
void this.config.storageProvider.set(this.storageKey, events);
316+
317+
const filterEventInsertIdSet = eventsToRemove.reduce((filtered, context) => {
318+
if (context.event.insert_id) {
319+
filtered.add(context.event.insert_id);
320+
}
321+
return filtered;
322+
}, new Set<string>());
323+
324+
const savedEvents = await this.config.storageProvider.get(this.storageKey);
325+
const updatedEvents: Event[] = eventsToAdd?.map((context) => context.event) || [];
326+
327+
savedEvents?.forEach((event) => {
328+
if (event.insert_id && !filterEventInsertIdSet.has(event.insert_id)) {
329+
updatedEvents.push(event);
330+
}
331+
});
332+
await this.config.storageProvider.set(this.storageKey, updatedEvents);
316333
}
317334
}

packages/analytics-core/test/plugins/destination.test.ts

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -548,8 +548,15 @@ describe('destination', () => {
548548
});
549549
});
550550

551-
describe('saveEvents', () => {
552-
test('should save to storage provider', () => {
551+
describe('filterEvent', () => {
552+
test('should be ok with no storage provider', async () => {
553+
const destination = new Destination();
554+
destination.config = useDefaultConfig();
555+
destination.config.storageProvider = undefined;
556+
expect(await destination.updateEventStorage([])).toBe(undefined);
557+
});
558+
559+
test('should filter dropped event and update the storage provider', async () => {
553560
const destination = new Destination();
554561
destination.config = useDefaultConfig();
555562
destination.config.storageProvider = {
@@ -560,16 +567,46 @@ describe('destination', () => {
560567
reset: async () => undefined,
561568
getRaw: async () => undefined,
562569
};
570+
const event1 = { event_type: 'event', insert_id: '1' };
571+
const event2 = { event_type: 'filtered_event', insert_id: '2' };
572+
const get = jest.spyOn(destination.config.storageProvider, 'get').mockResolvedValueOnce([event1, event2]);
563573
const set = jest.spyOn(destination.config.storageProvider, 'set').mockResolvedValueOnce(undefined);
564-
destination.saveEvents();
574+
const context = {
575+
event: event2,
576+
attempts: 0,
577+
callback: () => undefined,
578+
timeout: 0,
579+
};
580+
await destination.updateEventStorage([context]);
581+
expect(get).toHaveBeenCalledTimes(1);
565582
expect(set).toHaveBeenCalledTimes(1);
583+
expect(set).toHaveBeenCalledWith('', expect.objectContaining([event1]));
566584
});
567585

568-
test('should be ok with no storage provider', () => {
586+
test('should save event to the storage provider', async () => {
569587
const destination = new Destination();
570588
destination.config = useDefaultConfig();
571-
destination.config.storageProvider = undefined;
572-
expect(destination.saveEvents()).toBe(undefined);
589+
destination.config.storageProvider = {
590+
isEnabled: async () => true,
591+
get: async () => undefined,
592+
set: async () => undefined,
593+
remove: async () => undefined,
594+
reset: async () => undefined,
595+
getRaw: async () => undefined,
596+
};
597+
const event = { event_type: 'event', insert_id: '1' };
598+
const get = jest.spyOn(destination.config.storageProvider, 'get').mockResolvedValueOnce([]);
599+
const set = jest.spyOn(destination.config.storageProvider, 'set').mockResolvedValueOnce(undefined);
600+
const context = {
601+
event: event,
602+
attempts: 0,
603+
callback: () => undefined,
604+
timeout: 0,
605+
};
606+
await destination.updateEventStorage([], [context]);
607+
expect(get).toHaveBeenCalledTimes(1);
608+
expect(set).toHaveBeenCalledTimes(1);
609+
expect(set).toHaveBeenCalledWith('', expect.objectContaining([event]));
573610
});
574611
});
575612

0 commit comments

Comments
 (0)