Skip to content

Commit

Permalink
feat(infra): subscribe for indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Jun 19, 2024
1 parent 7c2e6c2 commit d1d1eb8
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 28 deletions.
119 changes: 109 additions & 10 deletions packages/common/infra/src/sync/indexer/__tests__/black-box.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
*/
import 'fake-indexeddb/auto';

import { beforeEach, describe, expect, test } from 'vitest';
import { map } from 'rxjs';
import { beforeEach, describe, expect, test, vitest } from 'vitest';

import { defineSchema, Document, type Index } from '..';
import { IndexedDBIndex } from '../impl/indexeddb';
Expand All @@ -21,7 +22,7 @@ describe.each([
{ name: 'memory', backend: MemoryIndex },
{ name: 'idb', backend: IndexedDBIndex },
])('index tests($name)', ({ backend }) => {
async function initData(
async function writeData(
data: Record<
string,
Partial<Record<keyof typeof schema, string | string[]>>
Expand Down Expand Up @@ -50,7 +51,7 @@ describe.each([
});

test('basic', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
},
Expand Down Expand Up @@ -79,7 +80,7 @@ describe.each([
});

test('basic integer', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
size: '100',
Expand Down Expand Up @@ -109,7 +110,7 @@ describe.each([
});

test('fuzz', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
},
Expand Down Expand Up @@ -137,7 +138,7 @@ describe.each([
});

test('highlight', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
size: '100',
Expand Down Expand Up @@ -181,7 +182,7 @@ describe.each([
});

test('fields', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
tag: ['car', 'bike'],
Expand All @@ -206,7 +207,7 @@ describe.each([
});

test('pagination', async () => {
await initData(
await writeData(
Array.from({ length: 100 }).reduce((acc: any, _, i) => {
acc['apple' + i] = {
tag: ['apple'],
Expand Down Expand Up @@ -275,7 +276,7 @@ describe.each([
});

test('aggr', async () => {
await initData({
await writeData({
'1': {
title: 'hello world',
tag: ['car', 'bike'],
Expand Down Expand Up @@ -315,7 +316,7 @@ describe.each([
});

test('hits', async () => {
await initData(
await writeData(
Array.from({ length: 100 }).reduce((acc: any, _, i) => {
acc['apple' + i] = {
title: 'apple',
Expand Down Expand Up @@ -412,4 +413,102 @@ describe.each([
},
});
});

test('subscribe', async () => {
await writeData({
'1': {
title: 'hello world',
},
});

let value = null as any;
index
.search$({
type: 'match',
field: 'title',
match: 'hello world',
})
.pipe(map(v => (value = v)))
.subscribe();

await vitest.waitFor(
() => {
expect(value).toEqual({
nodes: [
{
id: '1',
score: expect.anything(),
},
],
pagination: {
count: 1,
hasMore: false,
limit: expect.anything(),
skip: 0,
},
});
},
{
timeout: 5000,
}
);

await writeData({
'2': {
title: 'hello world',
},
});

await vitest.waitFor(
() => {
expect(value).toEqual({
nodes: [
{
id: '1',
score: expect.anything(),
},
{
id: '2',
score: expect.anything(),
},
],
pagination: {
count: 2,
hasMore: false,
limit: expect.anything(),
skip: 0,
},
});
},
{
timeout: 5000,
}
);

const writer = await index.write();
writer.delete('1');
await writer.commit();

await vitest.waitFor(
() => {
expect(value).toEqual({
nodes: [
{
id: '2',
score: expect.anything(),
},
],
pagination: {
count: 1,
hasMore: false,
limit: expect.anything(),
skip: 0,
},
});
},
{
timeout: 5000,
}
);
});
});
4 changes: 3 additions & 1 deletion packages/common/infra/src/sync/indexer/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export class Document<S extends Schema = any> {

static from<S extends Schema>(
id: string,
map: Record<keyof S, string | string[]> | Map<keyof S, string | string[]>
map:
| Partial<Record<keyof S, string | string[]>>
| Map<keyof S, string | string[]>
): Document<S> {
const doc = new Document(id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class DataStruct {
invertedIndex = new Map<string, InvertedIndex>();

constructor(
private readonly databaseName: string,
readonly databaseName: string,
schema: Schema
) {
for (const [key, type] of Object.entries(schema)) {
Expand Down
51 changes: 49 additions & 2 deletions packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import type { Observable } from 'rxjs';
import { from, merge, of, Subject, throttleTime } from 'rxjs';

import { exhaustMapWithTrailing } from '../../../../utils/exhaustmap-with-trailing';
import {
type AggregateOptions,
type AggregateResult,
Expand All @@ -14,11 +18,17 @@ import { DataStruct, type DataStructRWTransaction } from './data-struct';

export class IndexedDBIndex<S extends Schema> implements Index<S> {
data: DataStruct = new DataStruct(this.databaseName, this.schema);
broadcast$ = new Subject();

constructor(
private readonly schema: S,
private readonly databaseName: string = 'indexer'
) {}
) {
const channel = new BroadcastChannel(this.databaseName + ':indexer');
channel.onmessage = () => {
this.broadcast$.next(1);
};
}

async get(id: string): Promise<Document<S> | null> {
return (await this.getAll([id]))[0] ?? null;
Expand Down Expand Up @@ -46,6 +56,23 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
return this.data.search(trx, query, options);
}

search$(
query: Query<any>,
options: SearchOptions<any> = {}
): Observable<SearchResult<any, SearchOptions<any>>> {
return merge(of(1), this.broadcast$).pipe(
throttleTime(500, undefined, { leading: false, trailing: true }),
exhaustMapWithTrailing(() => {
return from(
(async () => {
const trx = await this.data.readonly();
return this.data.search(trx, query, options);
})()
);
})
);
}

async aggregate(
query: Query<any>,
field: string,
Expand All @@ -55,6 +82,24 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
return this.data.aggregate(trx, query, field, options);
}

aggregate$(
query: Query<any>,
field: string,
options: AggregateOptions<any> = {}
): Observable<AggregateResult<S, AggregateOptions<any>>> {
return merge(of(1), this.broadcast$).pipe(
throttleTime(500, undefined, { leading: false, trailing: true }),
exhaustMapWithTrailing(() => {
return from(
(async () => {
const trx = await this.data.readonly();
return this.data.aggregate(trx, query, field, options);
})()
);
})
);
}

async clear(): Promise<void> {
const trx = await this.data.readwrite();
return this.data.clear(trx);
Expand All @@ -64,6 +109,7 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
export class IndexedDBIndexWriter<S extends Schema> implements IndexWriter<S> {
inserts: Document[] = [];
deletes: string[] = [];
channel = new BroadcastChannel(this.data.databaseName + ':indexer');

constructor(
private readonly data: DataStruct,
Expand Down Expand Up @@ -91,7 +137,8 @@ export class IndexedDBIndexWriter<S extends Schema> implements IndexWriter<S> {
}

async commit(): Promise<void> {
return this.data.batchWrite(this.trx, this.deletes, this.inserts);
await this.data.batchWrite(this.trx, this.deletes, this.inserts);
this.channel.postMessage(1);
}

rollback(): void {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export class DataStruct {
search(
query: Query<any>,
options: SearchOptions<any> = {}
): Promise<SearchResult<any, any>> {
): SearchResult<any, any> {
const pagination = {
skip: options.pagination?.skip ?? 0,
limit: options.pagination?.limit ?? 100,
Expand All @@ -151,22 +151,22 @@ export class DataStruct {
.toArray()
.slice(pagination.skip, pagination.skip + pagination.limit);

return Promise.resolve({
return {
pagination: {
count: match.size(),
hasMore: match.size() > pagination.limit + pagination.skip,
limit: pagination.limit,
skip: pagination.skip,
},
nodes: nids.map(nid => this.resultNode(match, nid, options)),
});
};
}

aggregate(
query: Query<any>,
field: string,
options: AggregateOptions<any> = {}
): Promise<AggregateResult<any, any>> {
): AggregateResult<any, any> {
const pagination = {
skip: options.pagination?.skip ?? 0,
limit: options.pagination?.limit ?? 100,
Expand All @@ -189,7 +189,7 @@ export class DataStruct {
}
}

return Promise.resolve({
return {
buckets: buckets
.slice(pagination.skip, pagination.skip + pagination.limit)
.map(bucket => {
Expand Down Expand Up @@ -231,7 +231,7 @@ export class DataStruct {
limit: pagination.limit,
skip: pagination.skip,
},
});
};
}

has(id: string): boolean {
Expand Down
Loading

0 comments on commit d1d1eb8

Please sign in to comment.