Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize bloom filter application #6992

Merged
merged 15 commits into from
Feb 9, 2023
8 changes: 8 additions & 0 deletions common/api-review/util.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ export function createSubscribe<T>(executor: Executor<T>, onNoObservers?: Execut
// @public
export const decode: (token: string) => DecodedToken;

// Warning: (ae-missing-release-tag) "DecodeBase64StringError" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
export class DecodeBase64StringError extends Error {
// (undocumented)
readonly name = "DecodeBase64StringError";
}

// Warning: (ae-missing-release-tag) "deepCopy" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
Expand Down
5 changes: 0 additions & 5 deletions packages/firestore/src/model/path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ abstract class BasePath<B extends BasePath<B>> {
return this.segments.slice(this.offset, this.limit());
}

// TODO(Mila): Use database info and toString() to get full path instead.
toFullPath(): string {
return this.segments.join('/');
}

static comparator<T extends BasePath<T>>(
p1: BasePath<T>,
p2: BasePath<T>
Expand Down
13 changes: 12 additions & 1 deletion packages/firestore/src/platform/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
* limitations under the License.
*/

import { Base64DecodeError } from '../util/base64_decode_error';

// This file is only used under ts-node.
// eslint-disable-next-line @typescript-eslint/no-require-imports
const platform = require(`./${process.env.TEST_PLATFORM ?? 'node'}/base64`);

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return platform.decodeBase64(encoded);
const decoded = platform.decodeBase64(encoded);

// A quick sanity check as node and rn will not throw error if input is an
// invalid base64 string, e.g., "A===".
const expectedEncodedLength = 4 * Math.ceil(decoded.length / 3);
if (encoded.length !== expectedEncodedLength) {
throw new Base64DecodeError('Invalid base64 string');
}

return decoded;
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
12 changes: 11 additions & 1 deletion packages/firestore/src/platform/browser/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
* limitations under the License.
*/

import { Base64DecodeError } from '../../util/base64_decode_error';

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return atob(encoded);
try {
return atob(encoded);
} catch (e) {
if (e instanceof DOMException) {
throw new Base64DecodeError('Invalid base64 string: ' + e);
} else {
throw e;
}
}
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
26 changes: 18 additions & 8 deletions packages/firestore/src/platform/rn/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@
* limitations under the License.
*/

import { base64 } from '@firebase/util';
import { base64, DecodeBase64StringError } from '@firebase/util';

import { Base64DecodeError } from '../../util/base64_decode_error';

// WebSafe uses a different URL-encoding safe alphabet that doesn't match
// the encoding used on the backend.
const WEB_SAFE = false;

/** Converts a Base64 encoded string to a binary string. */
export function decodeBase64(encoded: string): string {
return String.fromCharCode.apply(
null,
// We use `decodeStringToByteArray()` instead of `decodeString()` since
// `decodeString()` returns Unicode strings, which doesn't match the values
// returned by `atob()`'s Latin1 representation.
base64.decodeStringToByteArray(encoded, WEB_SAFE)
);
try {
return String.fromCharCode.apply(
null,
// We use `decodeStringToByteArray()` instead of `decodeString()` since
// `decodeString()` returns Unicode strings, which doesn't match the values
// returned by `atob()`'s Latin1 representation.
base64.decodeStringToByteArray(encoded, WEB_SAFE)
);
} catch (e) {
if (e instanceof DecodeBase64StringError) {
throw new Base64DecodeError('Invalid base64 string: ' + e);
} else {
throw e;
}
}
}

/** Converts a binary string to a Base64 encoded string. */
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import {
*/
export abstract class Datastore {
abstract terminate(): void;
abstract serializer: JsonProtoSerializer;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ function startWatchStream(remoteStoreImpl: RemoteStoreImpl): void {
getRemoteKeysForTarget: targetId =>
remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(targetId),
getTargetDataForTarget: targetId =>
remoteStoreImpl.listenTargets.get(targetId) || null
remoteStoreImpl.listenTargets.get(targetId) || null,
getDatabaseId: () => remoteStoreImpl.datastore.serializer.databaseId
});
ensureWatchStream(remoteStoreImpl).start();
remoteStoreImpl.onlineStateTracker.handleWatchStreamStart();
Expand Down
61 changes: 35 additions & 26 deletions packages/firestore/src/remote/watch_change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import { DatabaseId } from '../core/database_info';
import { SnapshotVersion } from '../core/snapshot_version';
import { targetIsDocumentTarget } from '../core/target';
import { TargetId } from '../core/types';
Expand All @@ -29,6 +30,7 @@ import { MutableDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { normalizeByteString } from '../model/normalize';
import { debugAssert, fail, hardAssert } from '../util/assert';
import { Base64DecodeError } from '../util/base64_decode_error';
import { ByteString } from '../util/byte_string';
import { FirestoreError } from '../util/error';
import { logDebug, logWarn } from '../util/log';
Expand Down Expand Up @@ -253,6 +255,11 @@ export interface TargetMetadataProvider {
* has become inactive
*/
getTargetDataForTarget(targetId: TargetId): TargetData | null;

/**
* Returns the database ID of the Firestore instance.
*/
getDatabaseId(): DatabaseId;
}

const LOG_TAG = 'WatchChangeAggregator';
Expand Down Expand Up @@ -416,8 +423,7 @@ export class WatchChangeAggregator {
if (currentSize !== expectedCount) {
// Apply bloom filter to identify and mark removed documents.
const bloomFilterApplied = this.applyBloomFilter(
watchChange.existenceFilter,
targetId,
watchChange,
currentSize
);
if (!bloomFilterApplied) {
Expand All @@ -433,12 +439,11 @@ export class WatchChangeAggregator {

/** Returns whether a bloom filter removed the deleted documents successfully. */
private applyBloomFilter(
existenceFilter: ExistenceFilter,
targetId: number,
watchChange: ExistenceFilterChange,
currentCount: number
): boolean {
const unchangedNames = existenceFilter.unchangedNames;
const expectedCount = existenceFilter.count;
const { unchangedNames, count: expectedCount } =
watchChange.existenceFilter;

if (!unchangedNames || !unchangedNames.bits) {
return false;
Expand All @@ -449,17 +454,22 @@ export class WatchChangeAggregator {
hashCount = 0
} = unchangedNames;

// TODO(Mila): Remove this validation, add try catch to normalizeByteString.
if (typeof bitmap === 'string') {
const isValidBitmap = this.isValidBase64String(bitmap);
if (!isValidBitmap) {
logWarn('Invalid base64 string. Applying bloom filter failed.');
let normalizedBitmap: Uint8Array;
try {
normalizedBitmap = normalizeByteString(bitmap).toUint8Array();
} catch (err) {
if (err instanceof Base64DecodeError) {
logWarn(
'Decoding the base64 bloom filter in existence filter failed (' +
err.message +
'); ignoring the bloom filter and falling back to full re-query.'
);
return false;
} else {
throw err;
}
}

const normalizedBitmap = normalizeByteString(bitmap).toUint8Array();

let bloomFilter: BloomFilter;
try {
// BloomFilter throws error if the inputs are invalid.
Expand All @@ -474,37 +484,36 @@ export class WatchChangeAggregator {
}

const removedDocumentCount = this.filterRemovedDocuments(
bloomFilter,
targetId
watchChange.targetId,
bloomFilter
);

return expectedCount === currentCount - removedDocumentCount;
}

// TODO(Mila): Move the validation into normalizeByteString.
private isValidBase64String(value: string): boolean {
const regExp = new RegExp(
'^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$'
);
return regExp.test(value);
}

/**
* Filter out removed documents based on bloom filter membership result and
* return number of documents removed.
*/
private filterRemovedDocuments(
bloomFilter: BloomFilter,
targetId: number
targetId: number,
bloomFilter: BloomFilter
): number {
const existingKeys = this.metadataProvider.getRemoteKeysForTarget(targetId);
let removalCount = 0;

existingKeys.forEach(key => {
if (!bloomFilter.mightContain(key.path.toFullPath())) {
const databaseId = this.metadataProvider.getDatabaseId();
const documentPath = `projects/${databaseId.projectId}/databases/${
databaseId.database
}/documents/${key.path.canonicalString()}`;

if (!bloomFilter.mightContain(documentPath)) {
this.removeDocumentFromTarget(targetId, key, /*updatedDocument=*/ null);
removalCount++;
}
});

return removalCount;
}

Expand Down
23 changes: 23 additions & 0 deletions packages/firestore/src/util/base64_decode_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* @license
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* An error encountered while decoding base64 string.
*/
export class Base64DecodeError extends Error {
readonly name = 'Base64DecodeError';
}
2 changes: 1 addition & 1 deletion packages/firestore/test/lite/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ describe('DocumentSnapshot', () => {

it('returns Bytes', () => {
return withTestDocAndInitialData(
{ bytes: Bytes.fromBase64String('aa') },
{ bytes: Bytes.fromBase64String('aa==') },
async docRef => {
const docSnap = await getDoc(docRef);
const bytes = docSnap.get('bytes');
Expand Down
9 changes: 6 additions & 3 deletions packages/firestore/test/unit/local/local_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,8 @@ function genericLocalStoreTests(
);
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator.handleTargetChange(watchChange);
const remoteEvent = aggregator.createRemoteEvent(version(1000));
Expand Down Expand Up @@ -1313,7 +1314,8 @@ function genericLocalStoreTests(
);
const aggregator1 = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator1.handleTargetChange(watchChange1);
const remoteEvent1 = aggregator1.createRemoteEvent(version(1000));
Expand All @@ -1326,7 +1328,8 @@ function genericLocalStoreTests(
);
const aggregator2 = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: () => targetData
getTargetDataForTarget: () => targetData,
getDatabaseId: () => persistenceHelpers.TEST_DATABASE_ID
});
aggregator2.handleTargetChange(watchChange2);
const remoteEvent2 = aggregator2.createRemoteEvent(version(2000));
Expand Down
6 changes: 4 additions & 2 deletions packages/firestore/test/unit/remote/remote_event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
key,
forEachNumber
} from '../../util/helpers';
import { TEST_DATABASE_ID } from '../local/persistence_test_helpers';

interface TargetMap {
[targetId: string]: TargetData;
Expand Down Expand Up @@ -110,11 +111,11 @@ describe('RemoteEvent', () => {
targetIds.push(targetId);
});
}

const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => options.existingKeys || documentKeySet(),
getTargetDataForTarget: targetId =>
options.targets ? options.targets[targetId] : null
options.targets ? options.targets[targetId] : null,
getDatabaseId: () => TEST_DATABASE_ID
});

if (options.outstandingResponses) {
Expand Down Expand Up @@ -155,6 +156,7 @@ describe('RemoteEvent', () => {
version(options.snapshotVersion)
);
}

it('will accumulate document added and removed events', () => {
const targets = listens(1, 2, 3, 4, 5, 6);

Expand Down
12 changes: 8 additions & 4 deletions packages/firestore/test/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ export function noChangeEvent(
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getTargetDataForTarget: targetId =>
targetData(targetId, TargetPurpose.Listen, 'foo')
targetData(targetId, TargetPurpose.Listen, 'foo'),
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleTargetChange(
new WatchTargetChange(
Expand All @@ -439,7 +440,8 @@ export function existenceFilterEvent(
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => syncedKeys,
getTargetDataForTarget: targetId =>
targetData(targetId, TargetPurpose.Listen, 'foo')
targetData(targetId, TargetPurpose.Listen, 'foo'),
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleExistenceFilter(
new ExistenceFilterChange(
Expand Down Expand Up @@ -476,7 +478,8 @@ export function docAddedRemoteEvent(
} else {
return null;
}
}
},
getDatabaseId: () => TEST_DATABASE_ID
});

let version = SnapshotVersion.min();
Expand Down Expand Up @@ -523,7 +526,8 @@ export function docUpdateRemoteEvent(
? TargetPurpose.LimboResolution
: TargetPurpose.Listen;
return targetData(targetId, purpose, doc.key.toString());
}
},
getDatabaseId: () => TEST_DATABASE_ID
});
aggregator.handleDocumentChange(docChange);
return aggregator.createRemoteEvent(doc.version);
Expand Down
Loading