Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d69497b
First response latencies for readRows only
danieljbruce Jul 30, 2025
bcb4cba
Remove only
danieljbruce Jul 30, 2025
84c5c5a
Do the check in the metrics handler instead
danieljbruce Jul 31, 2025
262e8c1
Added a comment
danieljbruce Jul 31, 2025
278d3c5
fix the tests to always expect a value for
danieljbruce Jul 31, 2025
c949dca
Fix assertion checks
danieljbruce Jul 31, 2025
89623bb
Merge branch 'main' into first-response-latency-readrows-only
danieljbruce Aug 1, 2025
b94b59f
Add send message and onAttempComplete
danieljbruce Aug 1, 2025
260d69c
Merge branch 'first-response-latency-readrows-only' of https://github…
danieljbruce Aug 1, 2025
38139d8
Implicit onAttemptComplete
danieljbruce Aug 1, 2025
ad536e7
Add code to attach metrics to checkAndMutate calls
danieljbruce Aug 1, 2025
90cd4c6
Fix authorized views check and mutate test
danieljbruce Aug 1, 2025
b60c379
Add CheckAndMutateRow tests
danieljbruce Aug 1, 2025
0f82a79
Use the exact column family
danieljbruce Aug 1, 2025
d10d082
Change expected method to checkAndMutateRow
danieljbruce Aug 1, 2025
554846d
Removing only
danieljbruce Aug 1, 2025
e2dfc6a
withMetricInterceptors change
danieljbruce Aug 1, 2025
e34be96
Unit test needs adjustment
danieljbruce Aug 1, 2025
0c2373e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 1, 2025
0152398
Merge branch 'main' of https://github.com/googleapis/nodejs-bigtable …
danieljbruce Aug 5, 2025
6b68a6d
Remove the import diff
danieljbruce Aug 5, 2025
8f4d9a0
Generate documentation for withMetricsInterceptors
danieljbruce Aug 5, 2025
462c9f5
rename createMetricsUnaryInterceptorProvider
danieljbruce Aug 5, 2025
96b8cdf
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 5, 2025
6021cf2
Merge branch 'main' into 359913994-checkAndMutateRow
danieljbruce Aug 6, 2025
d4905bf
Merge branch 'main' into 359913994-checkAndMutateRow
danieljbruce Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/client-side-metrics/metric-interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// Mock Server Implementation
import * as grpcJs from '@grpc/grpc-js';
import {status as GrpcStatus} from '@grpc/grpc-js';

Check warning on line 20 in src/client-side-metrics/metric-interceptor.ts

View workflow job for this annotation

GitHub Actions / lint

'GrpcStatus' is defined but never used

export type ServerStatus = {
metadata: {internalRepr: Map<string, Uint8Array[]>; options: {}};
Expand Down Expand Up @@ -63,7 +63,20 @@
};
}

export function withMetricInterceptors(
/**
* Attaches a metrics interceptor to unary calls for collecting client-side metrics.
*
* This method modifies the given `gaxOptions` to include an interceptor that
* will be triggered during the execution of a unary gRPC call. The interceptor
* uses the provided `OperationMetricsCollector` to record various metrics
* related to the call, such as latency, retries, and errors.
*
* @param {CallOptions} gaxOptions The existing GAX call options to modify.
* @param {OperationMetricsCollector} metricsCollector The metrics collector
* for the operation.
* @returns {CallOptions} The modified `gaxOptions` with the interceptor attached.
*/
export function createMetricsUnaryInterceptorProvider(
gaxOptions: CallOptions,
metricsCollector?: OperationMetricsCollector,
) {
Expand Down
27 changes: 24 additions & 3 deletions src/row-data-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
MethodName,
StreamingState,
} from './client-side-metrics/client-side-metrics-attributes';
import {withMetricInterceptors} from './client-side-metrics/metric-interceptor';
import {createMetricsUnaryInterceptorProvider} from './client-side-metrics/metric-interceptor';

interface TabularApiSurfaceRequest {
tableName?: string;
Expand Down Expand Up @@ -89,14 +89,32 @@ class RowDataUtils {
properties.reqOpts,
);
properties.requestData.data = {};
// 1. Create a metrics collector.
const metricsCollector = new OperationMetricsCollector(
properties.requestData.table,
MethodName.CHECK_AND_MUTATE_ROW,
StreamingState.UNARY,
(
properties.requestData.table as any
).bigtable._metricsConfigManager!.metricsHandlers,
);
// 2. Tell the metrics collector an attempt has been started.
metricsCollector.onOperationStart();
// 3. Make a unary call with gax options that include interceptors. The
// interceptors are built from a method that hooks them up to the
// metrics collector
properties.requestData.bigtable.request<google.bigtable.v2.ICheckAndMutateRowResponse>(
{
client: 'BigtableClient',
method: 'checkAndMutateRow',
reqOpts,
gaxOpts: config.gaxOptions,
gaxOpts: createMetricsUnaryInterceptorProvider(
config.gaxOptions ?? {},
metricsCollector,
),
},
(err, apiResponse) => {
metricsCollector.onOperationComplete(err ? err.code : 0);
if (err) {
callback(err, null, apiResponse);
return;
Expand Down Expand Up @@ -219,7 +237,10 @@ class RowDataUtils {
client: 'BigtableClient',
method: 'readModifyWriteRow',
reqOpts,
gaxOpts: withMetricInterceptors(gaxOptions, metricsCollector),
gaxOpts: createMetricsUnaryInterceptorProvider(
gaxOptions,
metricsCollector,
),
},
(err, ...args) => {
metricsCollector.onOperationComplete(err ? err.code : 0);
Expand Down
236 changes: 235 additions & 1 deletion system-test/client-side-metrics-all-methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {ResourceMetrics} from '@opentelemetry/sdk-metrics';
import * as assert from 'assert';
import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler';
import * as proxyquire from 'proxyquire';
import {Bigtable} from '../src';
import {Bigtable, RawFilter} from '../src';
import {Mutation} from '../src/mutation';
import {Row} from '../src/row';
import {
Expand Down Expand Up @@ -191,6 +191,18 @@ function readRowsAssertionCheck(
});
}

function checkCheckAndMutateCall(
projectId: string,
requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [],
) {
readRowsAssertionCheck(
projectId,
requestsHandled,
'Bigtable.CheckAndMutateRow',
'false',
);
}

function checkMultiRowCall(
projectId: string,
requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [],
Expand Down Expand Up @@ -275,6 +287,18 @@ const mutation = {
method: Mutation.methods.INSERT,
};

const filter: RawFilter = {
family: 'cf1',
value: 'alincoln',
};

const mutations = [
{
method: 'delete',
data: ['cf1:alincoln'],
},
];

const rules = [
{
column: 'cf1:column',
Expand Down Expand Up @@ -862,6 +886,91 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => {
});
});
});
describe('CheckAndMutateRow', () => {
it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => {
(async () => {
try {
const bigtable = await mockBigtable(defaultProjectId, done);
for (const instanceId of [instanceId1, instanceId2]) {
await setupBigtableWithInsert(
bigtable,
columnFamilyId,
instanceId,
[tableId1, tableId2],
);
const instance = bigtable.instance(instanceId);
const table = instance.table(tableId1);
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
const table2 = instance.table(tableId2);
const row2 = table2.row(columnFamilyId);
await row2.filter(filter, {onMatch: mutations});
}
} catch (e) {
done(new Error('An error occurred while running the script'));
done(e);
}
})().catch(err => {
throw err;
});
});
it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => {
(async () => {
try {
const bigtable = await mockBigtable(
defaultProjectId,
done,
'bogus-endpoint',
);
const instance = bigtable.instance(instanceId1);
const table = instance.table(tableId1);
try {
// This call will fail because we are trying to hit a bogus endpoint.
// The idea here is that we just want to record at least one metric
// so that the exporter gets executed.
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
} catch (e: unknown) {
// Try blocks just need a catch/finally block.
}
} catch (e) {
done(new Error('An error occurred while running the script'));
done(e);
}
})().catch(err => {
throw err;
});
});
it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call with a second project', done => {
(async () => {
try {
// This is the second project the test is configured to work with:
const projectId = SECOND_PROJECT_ID;
const bigtable = await mockBigtable(projectId, done);
for (const instanceId of [instanceId1, instanceId2]) {
await setupBigtableWithInsert(
bigtable,
columnFamilyId,
instanceId,
[tableId1, tableId2],
);
const instance = bigtable.instance(instanceId);
const table = instance.table(tableId1);
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
const table2 = instance.table(tableId2);
const row2 = table2.row(columnFamilyId);
await row2.filter(filter, {onMatch: mutations});
}
} catch (e) {
done(new Error('An error occurred while running the script'));
done(e);
}
})().catch(err => {
throw err;
});
});
});
});
describe('Bigtable/ClientSideMetricsToGCMTimeout', () => {
// This test suite simulates a situation where the user creates multiple
Expand Down Expand Up @@ -1430,6 +1539,111 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => {
});
});
});
describe('CheckAndMutateRow', () => {
it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => {
let testFinished = false;
/*
We need to create a timeout here because if we don't then mocha shuts down
the test as it is sleeping before the GCPMetricsHandler has a chance to
export the data. When the timeout is finished, if there were no export
errors then the test passes.
*/
setTimeout(() => {
testFinished = true;
done();
}, 120000);
(async () => {
try {
const bigtable1 = await mockBigtable(defaultProjectId, done);
const bigtable2 = await mockBigtable(defaultProjectId, done);
for (const bigtable of [bigtable1, bigtable2]) {
for (const instanceId of [instanceId1, instanceId2]) {
await setupBigtableWithInsert(
bigtable,
columnFamilyId,
instanceId,
[tableId1, tableId2],
);
const instance = bigtable.instance(instanceId);
const table = instance.table(tableId1);
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
const table2 = instance.table(tableId2);
const row2 = table2.row(columnFamilyId);
await row2.filter(filter, {onMatch: mutations});
}
}
} catch (e) {
done(new Error('An error occurred while running the script'));
done(e);
}
})().catch(err => {
throw err;
});
});
it('should send the metrics to Google Cloud Monitoring for a single CheckAndMutateRow call with thirty clients', done => {
/*
We need to create a timeout here because if we don't then mocha shuts down
the test as it is sleeping before the GCPMetricsHandler has a chance to
export the data. When the timeout is finished, if there were no export
errors then the test passes.
*/
const testTimeout = setTimeout(() => {
done(new Error('The test timed out'));
}, 480000);
let testComplete = false;
const numClients = 30;
(async () => {
try {
const bigtableList = [];
const completedSet = new Set();
for (
let bigtableCount = 0;
bigtableCount < numClients;
bigtableCount++
) {
const currentCount = bigtableCount;
const onExportSuccess = () => {
completedSet.add(currentCount);
if (completedSet.size === numClients) {
// If every client has completed the export then pass the test.
clearTimeout(testTimeout);
if (!testComplete) {
testComplete = true;
done();
}
}
};
bigtableList.push(
await mockBigtable(defaultProjectId, done, onExportSuccess),
);
}
for (const bigtable of bigtableList) {
for (const instanceId of [instanceId1, instanceId2]) {
await setupBigtableWithInsert(
bigtable,
columnFamilyId,
instanceId,
[tableId1, tableId2],
);
const instance = bigtable.instance(instanceId);
const table = instance.table(tableId1);
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
const table2 = instance.table(tableId2);
const row2 = table2.row(columnFamilyId);
await row2.filter(filter, {onMatch: mutations});
}
}
} catch (e) {
done(e);
done(new Error('An error occurred while running the script'));
}
})().catch(err => {
throw err;
});
});
});
});
describe('Bigtable/ClientSideMetricsToMetricsHandler', () => {
async function getFakeBigtableWithHandler(
Expand Down Expand Up @@ -1689,5 +1903,25 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => {
});
});
});
describe('CheckAndMutateRow', () => {
it('should send the metrics to the metrics handler for a CheckAndMutateRow call for a single point', done => {
(async () => {
const bigtable = await mockBigtableWithNoInserts(
defaultProjectId,
done,
checkCheckAndMutateCall,
);
const instance = bigtable.instance(instanceId1);
const table = instance.table(tableId1);
const row = table.row(columnFamilyId);
await row.filter(filter, {onMatch: mutations});
const table2 = instance.table(tableId2);
const row2 = table2.row(columnFamilyId);
await row2.filter(filter, {onMatch: mutations});
})().catch(err => {
throw err;
});
});
});
});
});
7 changes: 5 additions & 2 deletions system-test/read-modify-write-row-interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from '../src/client-side-metrics/client-side-metrics-attributes';
import * as assert from 'assert';
import {status as GrpcStatus} from '@grpc/grpc-js';
import {withMetricInterceptors} from '../src/client-side-metrics/metric-interceptor';
import {createMetricsUnaryInterceptorProvider} from '../src/client-side-metrics/metric-interceptor';

const INSTANCE_ID = 'isolated-rmw-instance';
const TABLE_ID = 'isolated-rmw-table';
Expand Down Expand Up @@ -209,7 +209,10 @@ describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => {
],
appProfileId: undefined,
},
gaxOpts: withMetricInterceptors({}, metricsCollector),
gaxOpts: createMetricsUnaryInterceptorProvider(
{},
metricsCollector,
),
},
(err: ServiceError | null, resp?: any) => {
if (err) {
Expand Down
1 change: 1 addition & 0 deletions test/authorized-views.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ describe('Bigtable/AuthorizedViews', () => {
method: 'checkAndMutateRow',
gaxOpts: {
maxRetries: 4,
otherArgs: {},
},
reqOpts: Object.assign(
{
Expand Down
10 changes: 8 additions & 2 deletions test/row.ts
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,14 @@ describe('Bigtable/Row', () => {
config.reqOpts.falseMutations,
fakeMutations.mutations,
);

assert.strictEqual(config.gaxOpts, undefined);
config.gaxOpts.otherArgs.options.interceptors = [];
assert.deepStrictEqual(config.gaxOpts, {
otherArgs: {
options: {
interceptors: [],
},
},
});
assert.strictEqual(FakeMutation.parse.callCount, 2);
assert.strictEqual(FakeMutation.parse.getCall(0).args[0], mutations[0]);
assert.strictEqual(FakeMutation.parse.getCall(1).args[0], mutations[0]);
Expand Down
Loading