Skip to content

Commit

Permalink
Optimize bloom filter application (#6992)
Browse files Browse the repository at this point in the history
  • Loading branch information
milaGGL authored Feb 9, 2023
1 parent dee7744 commit 277f8e1
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 53 deletions.
8 changes: 8 additions & 0 deletions common/api-review/util.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ export function createSubscribe<T>(executor: Executor<T>, onNoObservers?: Execut
// @public
export const decode: (token: string) => DecodedToken;

// Warning: (ae-missing-release-tag) "DecodeBase64StringError" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
export class DecodeBase64StringError extends Error {
// (undocumented)
readonly name = "DecodeBase64StringError";
}

// Warning: (ae-missing-release-tag) "deepCopy" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
Expand Down
5 changes: 0 additions & 5 deletions packages/firestore/src/model/path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ abstract class BasePath<B extends BasePath<B>> {
return this.segments.slice(this.offset, this.limit());
}

// TODO(Mila): Use database info and toString() to get full path instead.
toFullPath(): string {
return this.segments.join('/');
}

static comparator<T extends BasePath<T>>(
p1: BasePath<T>,
p2: BasePath<T>
Expand Down
13 changes: 12 additions & 1 deletion packages/firestore/src/platform/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
* limitations under the License.
*/

import { Base64DecodeError } from '../util/base64_decode_error';

// This file is only used under ts-node.
// eslint-disable-next-line @typescript-eslint/no-require-imports
const platform = require(`./${process.env.TEST_PLATFORM ?? 'node'}/base64`);

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return platform.decodeBase64(encoded);
const decoded = platform.decodeBase64(encoded);

// A quick sanity check as node and rn will not throw error if input is an
// invalid base64 string, e.g., "A===".
const expectedEncodedLength = 4 * Math.ceil(decoded.length / 3);
if (encoded.length !== expectedEncodedLength) {
throw new Base64DecodeError('Invalid base64 string');
}

return decoded;
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
12 changes: 11 additions & 1 deletion packages/firestore/src/platform/browser/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
* limitations under the License.
*/

import { Base64DecodeError } from '../../util/base64_decode_error';

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return atob(encoded);
try {
return atob(encoded);
} catch (e) {
if (e instanceof DOMException) {
throw new Base64DecodeError('Invalid base64 string: ' + e);
} else {
throw e;
}
}
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
26 changes: 18 additions & 8 deletions packages/firestore/src/platform/rn/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@
* limitations under the License.
*/

import { base64 } from '@firebase/util';
import { base64, DecodeBase64StringError } from '@firebase/util';

import { Base64DecodeError } from '../../util/base64_decode_error';

// WebSafe uses a different URL-encoding safe alphabet that doesn't match
// the encoding used on the backend.
const WEB_SAFE = false;

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return String.fromCharCode.apply(
null,
// We use `decodeStringToByteArray()` instead of `decodeString()` since
// `decodeString()` returns Unicode strings, which doesn't match the values
// returned by `atob()`'s Latin1 representation.
base64.decodeStringToByteArray(encoded, WEB_SAFE)
);
try {
return String.fromCharCode.apply(
null,
// We use `decodeStringToByteArray()` instead of `decodeString()` since
// `decodeString()` returns Unicode strings, which doesn't match the values
// returned by `atob()`'s Latin1 representation.
base64.decodeStringToByteArray(encoded, WEB_SAFE)
);
} catch (e) {
if (e instanceof DecodeBase64StringError) {
throw new Base64DecodeError('Invalid base64 string: ' + e);
} else {
throw e;
}
}
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import {
*/
export abstract class Datastore {
abstract terminate(): void;
abstract serializer: JsonProtoSerializer;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ function startWatchStream(remoteStoreImpl: RemoteStoreImpl): void {
getRemoteKeysForTarget: targetId =>
remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(targetId),
getTargetDataForTarget: targetId =>
remoteStoreImpl.listenTargets.get(targetId) || null
remoteStoreImpl.listenTargets.get(targetId) || null,
getDatabaseId: () => remoteStoreImpl.datastore.serializer.databaseId
});
ensureWatchStream(remoteStoreImpl).start();
remoteStoreImpl.onlineStateTracker.handleWatchStreamStart();
Expand Down
61 changes: 35 additions & 26 deletions packages/firestore/src/remote/watch_change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import { DatabaseId } from '../core/database_info';
import { SnapshotVersion } from '../core/snapshot_version';
import { targetIsDocumentTarget } from '../core/target';
import { TargetId } from '../core/types';
Expand All @@ -29,6 +30,7 @@ import { MutableDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { normalizeByteString } from '../model/normalize';
import { debugAssert, fail, hardAssert } from '../util/assert';
import { Base64DecodeError } from '../util/base64_decode_error';
import { ByteString } from '../util/byte_string';
import { FirestoreError } from '../util/error';
import { logDebug, logWarn } from '../util/log';
Expand Down Expand Up @@ -253,6 +255,11 @@ export interface TargetMetadataProvider {
* has become inactive
*/
getTargetDataForTarget(targetId: TargetId): TargetData | null;

/**
* Returns the database ID of the Firestore instance.
*/
getDatabaseId(): DatabaseId;
}

const LOG_TAG = 'WatchChangeAggregator';
Expand Down Expand Up @@ -416,8 +423,7 @@ export class WatchChangeAggregator {
if (currentSize !== expectedCount) {
// Apply bloom filter to identify and mark removed documents.
const bloomFilterApplied = this.applyBloomFilter(
watchChange.existenceFilter,
targetId,
watchChange,
currentSize
);
if (!bloomFilterApplied) {
Expand All @@ -433,12 +439,11 @@ export class WatchChangeAggregator {

/** Returns whether a bloom filter removed the deleted documents successfully. */
private applyBloomFilter(
existenceFilter: ExistenceFilter,
targetId: number,
watchChange: ExistenceFilterChange,
currentCount: number
): boolean {
const unchangedNames = existenceFilter.unchangedNames;
const expectedCount = existenceFilter.count;
const { unchangedNames, count: expectedCount } =
watchChange.existenceFilter;

if (!unchangedNames || !unchangedNames.bits) {
return false;
Expand All @@ -449,17 +454,22 @@ export class WatchChangeAggregator {
hashCount = 0
} = unchangedNames;

// TODO(Mila): Remove this validation, add try catch to normalizeByteString.
if (typeof bitmap === 'string') {
const isValidBitmap = this.isValidBase64String(bitmap);
if (!isValidBitmap) {
logWarn('Invalid base64 string. Applying bloom filter failed.');
let normalizedBitmap: Uint8Array;
try {
normalizedBitmap = normalizeByteString(bitmap).toUint8Array();
} catch (err) {
if (err instanceof Base64DecodeError) {
logWarn(
'Decoding the base64 bloom filter in existence filter failed (' +
err.message +
'); ignoring the bloom filter and falling back to full re-query.'
);
return false;
} else {
throw err;
}
}

const normalizedBitmap = normalizeByteString(bitmap).toUint8Array();

let bloomFilter: BloomFilter;
try {
// BloomFilter throws error if the inputs are invalid.
Expand All @@ -474,37 +484,36 @@ export class WatchChangeAggregator {
}

const removedDocumentCount = this.filterRemovedDocuments(
bloomFilter,
targetId
watchChange.targetId,
bloomFilter
);

return expectedCount === currentCount - removedDocumentCount;
}

// TODO(Mila): Move the validation into normalizeByteString.
private isValidBase64String(value: string): boolean {
const regExp = new RegExp(
'^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$'
);
return regExp.test(value);
}

/**
* Filter out removed documents based on bloom filter membership result and
* return number of documents removed.
*/
private filterRemovedDocuments(
bloomFilter: BloomFilter,
targetId: number
targetId: number,
bloomFilter: BloomFilter
): number {
const existingKeys = this.metadataProvider.getRemoteKeysForTarget(targetId);
let removalCount = 0;

existingKeys.forEach(key => {
if (!bloomFilter.mightContain(key.path.toFullPath())) {
const databaseId = this.metadataProvider.getDatabaseId();
const documentPath = `projects/${databaseId.projectId}/databases/${
databaseId.database
}/documents/${key.path.canonicalString()}`;

if (!bloomFilter.mightContain(documentPath)) {
this.removeDocumentFromTarget(targetId, key, /*updatedDocument=*/ null);
removalCount++;
}
});

return removalCount;
}

Expand Down
23 changes: 23 additions & 0 deletions packages/firestore/src/util/base64_decode_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* @license
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* An error encountered while decoding base64 string.
*/
export class Base64DecodeError extends Error {
readonly name = 'Base64DecodeError';
}
2 changes: 1 addition & 1 deletion packages/firestore/test/lite/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ describe('DocumentSnapshot', () => {

it('returns Bytes', () => {
return withTestDocAndInitialData(
{ bytes: Bytes.fromBase64String('aa') },
{ bytes: Bytes.fromBase64String('aa==') },
async docRef => {
const docSnap = await getDoc(docRef);
const bytes = docSnap.get('bytes');
Expand Down
9 changes: 6 additions & 3 deletions packages/firestore/test/unit/local/local_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,8 @@ function genericLocalStoreTests(
);
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator.handleTargetChange(watchChange);
const remoteEvent = aggregator.createRemoteEvent(version(1000));
Expand Down Expand Up @@ -1313,7 +1314,8 @@ function genericLocalStoreTests(
);
const aggregator1 = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator1.handleTargetChange(watchChange1);
const remoteEvent1 = aggregator1.createRemoteEvent(version(1000));
Expand All @@ -1326,7 +1328,8 @@ function genericLocalStoreTests(
);
const aggregator2 = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator2.handleTargetChange(watchChange2);
const remoteEvent2 = aggregator2.createRemoteEvent(version(2000));
Expand Down
6 changes: 4 additions & 2 deletions packages/firestore/test/unit/remote/remote_event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
key,
forEachNumber
} from '../../util/helpers';
import { TEST_DATABASE_ID } from '../local/persistence_test_helpers';

interface TargetMap {
[targetId: string]: TargetData;
Expand Down Expand Up @@ -110,11 +111,11 @@ describe('RemoteEvent', () => {
targetIds.push(targetId);
});
}

const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => options.existingKeys || documentKeySet(),
getTargetDataForTarget: targetId =>
options.targets ? options.targets[targetId] : null
options.targets ? options.targets[targetId] : null,
getDatabaseId: () => TEST_DATABASE_ID
});

if (options.outstandingResponses) {
Expand Down Expand Up @@ -155,6 +156,7 @@ describe('RemoteEvent', () => {
version(options.snapshotVersion)
);
}

it('will accumulate document added and removed events', () => {
const targets = listens(1, 2, 3, 4, 5, 6);

Expand Down
12 changes: 8 additions & 4 deletions packages/firestore/test/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ export function noChangeEvent(
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: targetId =>
targetData(targetId, TargetPurpose.Listen, 'foo')
targetData(targetId, TargetPurpose.Listen, 'foo'),
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleTargetChange(
new WatchTargetChange(
Expand All @@ -439,7 +440,8 @@ export function existenceFilterEvent(
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => syncedKeys,
getTargetDataForTarget: targetId =>
targetData(targetId, TargetPurpose.Listen, 'foo')
targetData(targetId, TargetPurpose.Listen, 'foo'),
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleExistenceFilter(
new ExistenceFilterChange(
Expand Down Expand Up @@ -476,7 +478,8 @@ export function docAddedRemoteEvent(
} else {
return null;
}
}
},
getDatabaseId: () => TEST_DATABASE_ID
});

let version = SnapshotVersion.min();
Expand Down Expand Up @@ -523,7 +526,8 @@ export function docUpdateRemoteEvent(
? TargetPurpose.LimboResolution
: TargetPurpose.Listen;
return targetData(targetId, purpose, doc.key.toString());
}
},
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleDocumentChange(docChange);
return aggregator.createRemoteEvent(doc.version);
Expand Down
Loading

0 comments on commit 277f8e1

Please sign in to comment.