diff --git a/src/client-side-metrics/metric-interceptor.ts b/src/client-side-metrics/metric-interceptor.ts index 81b244d53..89b23d4a3 100644 --- a/src/client-side-metrics/metric-interceptor.ts +++ b/src/client-side-metrics/metric-interceptor.ts @@ -63,7 +63,20 @@ function createMetricsInterceptorProvider( }; } -export function withMetricInterceptors( +/** + * Attaches a metrics interceptor to unary calls for collecting client-side metrics. + * + * This method modifies the given `gaxOptions` to include an interceptor that + * will be triggered during the execution of a unary gRPC call. The interceptor + * uses the provided `OperationMetricsCollector` to record various metrics + * related to the call, such as latency, retries, and errors. + * + * @param {CallOptions} gaxOptions The existing GAX call options to modify. + * @param {OperationMetricsCollector} metricsCollector The metrics collector + * for the operation. + * @returns {CallOptions} The modified `gaxOptions` with the interceptor attached. + */ +export function createMetricsUnaryInterceptorProvider( gaxOptions: CallOptions, metricsCollector?: OperationMetricsCollector, ) { diff --git a/src/row-data-utils.ts b/src/row-data-utils.ts index 54d9ccd6c..65191057d 100644 --- a/src/row-data-utils.ts +++ b/src/row-data-utils.ts @@ -36,7 +36,7 @@ import { MethodName, StreamingState, } from './client-side-metrics/client-side-metrics-attributes'; -import {withMetricInterceptors} from './client-side-metrics/metric-interceptor'; +import {createMetricsUnaryInterceptorProvider} from './client-side-metrics/metric-interceptor'; interface TabularApiSurfaceRequest { tableName?: string; @@ -89,14 +89,32 @@ class RowDataUtils { properties.reqOpts, ); properties.requestData.data = {}; + // 1. Create a metrics collector. + const metricsCollector = new OperationMetricsCollector( + properties.requestData.table, + MethodName.CHECK_AND_MUTATE_ROW, + StreamingState.UNARY, + ( + properties.requestData.table as any + ).bigtable._metricsConfigManager!.metricsHandlers, + ); + // 2. Tell the metrics collector an attempt has been started. + metricsCollector.onOperationStart(); + // 3. Make a unary call with gax options that include interceptors. The + // interceptors are built from a method that hooks them up to the + // metrics collector properties.requestData.bigtable.request( { client: 'BigtableClient', method: 'checkAndMutateRow', reqOpts, - gaxOpts: config.gaxOptions, + gaxOpts: createMetricsUnaryInterceptorProvider( + config.gaxOptions ?? {}, + metricsCollector, + ), }, (err, apiResponse) => { + metricsCollector.onOperationComplete(err ? err.code : 0); if (err) { callback(err, null, apiResponse); return; @@ -219,7 +237,10 @@ class RowDataUtils { client: 'BigtableClient', method: 'readModifyWriteRow', reqOpts, - gaxOpts: withMetricInterceptors(gaxOptions, metricsCollector), + gaxOpts: createMetricsUnaryInterceptorProvider( + gaxOptions, + metricsCollector, + ), }, (err, ...args) => { metricsCollector.onOperationComplete(err ? err.code : 0); diff --git a/system-test/client-side-metrics-all-methods.ts b/system-test/client-side-metrics-all-methods.ts index f17475d06..9dc6a64dd 100644 --- a/system-test/client-side-metrics-all-methods.ts +++ b/system-test/client-side-metrics-all-methods.ts @@ -22,7 +22,7 @@ import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; import * as assert from 'assert'; import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler'; import * as proxyquire from 'proxyquire'; -import {Bigtable} from '../src'; +import {Bigtable, RawFilter} from '../src'; import {Mutation} from '../src/mutation'; import {Row} from '../src/row'; import { @@ -191,6 +191,18 @@ function readRowsAssertionCheck( }); } +function checkCheckAndMutateCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.CheckAndMutateRow', + 'false', + ); +} + function checkMultiRowCall( projectId: string, requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], @@ -275,6 +287,18 @@ const mutation = { method: Mutation.methods.INSERT, }; +const filter: RawFilter = { + family: 'cf1', + value: 'alincoln', +}; + +const mutations = [ + { + method: 'delete', + data: ['cf1:alincoln'], + }, +]; + const rules = [ { column: 'cf1:column', @@ -862,6 +886,91 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => { }); }); }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); }); describe('Bigtable/ClientSideMetricsToGCMTimeout', () => { // This test suite simulates a situation where the user creates multiple @@ -1430,6 +1539,111 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => { }); }); }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a single CheckAndMutateRow call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); }); describe('Bigtable/ClientSideMetricsToMetricsHandler', () => { async function getFakeBigtableWithHandler( @@ -1689,5 +1903,25 @@ describe('Bigtable/ClientSideMetricsAllMethods', () => { }); }); }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to the metrics handler for a CheckAndMutateRow call for a single point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkCheckAndMutateCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + })().catch(err => { + throw err; + }); + }); + }); }); }); diff --git a/system-test/read-modify-write-row-interceptors.ts b/system-test/read-modify-write-row-interceptors.ts index 68972aab6..96d847fe8 100644 --- a/system-test/read-modify-write-row-interceptors.ts +++ b/system-test/read-modify-write-row-interceptors.ts @@ -28,7 +28,7 @@ import { } from '../src/client-side-metrics/client-side-metrics-attributes'; import * as assert from 'assert'; import {status as GrpcStatus} from '@grpc/grpc-js'; -import {withMetricInterceptors} from '../src/client-side-metrics/metric-interceptor'; +import {createMetricsUnaryInterceptorProvider} from '../src/client-side-metrics/metric-interceptor'; const INSTANCE_ID = 'isolated-rmw-instance'; const TABLE_ID = 'isolated-rmw-table'; @@ -209,7 +209,10 @@ describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => { ], appProfileId: undefined, }, - gaxOpts: withMetricInterceptors({}, metricsCollector), + gaxOpts: createMetricsUnaryInterceptorProvider( + {}, + metricsCollector, + ), }, (err: ServiceError | null, resp?: any) => { if (err) { diff --git a/test/authorized-views.ts b/test/authorized-views.ts index d6ac71f47..a36eefd43 100644 --- a/test/authorized-views.ts +++ b/test/authorized-views.ts @@ -434,6 +434,7 @@ describe('Bigtable/AuthorizedViews', () => { method: 'checkAndMutateRow', gaxOpts: { maxRetries: 4, + otherArgs: {}, }, reqOpts: Object.assign( { diff --git a/test/row.ts b/test/row.ts index 8544107c2..ca1f444cf 100644 --- a/test/row.ts +++ b/test/row.ts @@ -962,8 +962,14 @@ describe('Bigtable/Row', () => { config.reqOpts.falseMutations, fakeMutations.mutations, ); - - assert.strictEqual(config.gaxOpts, undefined); + config.gaxOpts.otherArgs.options.interceptors = []; + assert.deepStrictEqual(config.gaxOpts, { + otherArgs: { + options: { + interceptors: [], + }, + }, + }); assert.strictEqual(FakeMutation.parse.callCount, 2); assert.strictEqual(FakeMutation.parse.getCall(0).args[0], mutations[0]); assert.strictEqual(FakeMutation.parse.getCall(1).args[0], mutations[0]);