diff --git a/src/row.ts b/src/row.ts index 39af7a32b..94155211d 100644 --- a/src/row.ts +++ b/src/row.ts @@ -36,6 +36,7 @@ import { MethodName, StreamingState, } from './client-side-metrics/client-side-metrics-attributes'; +import {mutateInternal} from './utils/mutateInternal'; export interface Rule { column: string; @@ -830,7 +831,19 @@ export class Row { method: Mutation.methods.INSERT, } as {} as Entry; this.data = {}; - this.table.mutate(mutation, gaxOptions as {} as MutateOptions, callback); + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.MUTATE_ROW, + StreamingState.UNARY, + this.table, + ); + mutateInternal( + this.table, + metricsCollector, + mutation, + gaxOptions as {} as MutateOptions, + callback, + ); } } diff --git a/src/tabular-api-surface.ts b/src/tabular-api-surface.ts index 2efacd1c9..b91eb9192 100644 --- a/src/tabular-api-surface.ts +++ b/src/tabular-api-surface.ts @@ -29,7 +29,6 @@ import {BackoffSettings} from 'google-gax/build/src/gax'; import {google} from '../protos/protos'; import {CallOptions, grpc, ServiceError} from 'google-gax'; import {Transform} from 'stream'; -import * as is from 'is'; import {GoogleInnerError} from './table'; import {createReadStreamInternal} from './utils/createReadStreamInternal'; import {getRowsInternal} from './utils/getRowsInternal'; @@ -37,6 +36,7 @@ import { MethodName, StreamingState, } from './client-side-metrics/client-side-metrics-attributes'; +import {mutateInternal} from './utils/mutateInternal'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE) @@ -333,216 +333,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); optionsOrCallback?: MutateOptions | MutateCallback, cb?: MutateCallback, ): void | Promise { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const entries: Entry[] = (arrify(entriesRaw) as Entry[]).reduce( - (a, b) => a.concat(b), - [], - ); - const collectMetricsCallback = ( - originalError: ServiceError | null, - err: ServiceError | PartialFailureError | null, - apiResponse?: google.protobuf.Empty, - ) => { - // originalError is the error that was sent from the gapic layer. The - // compiler guarantees that it contains a code which needs to be - // provided when an operation is marked complete. - // - // err is the error we intend to send back to the user. Often it is the - // same as originalError, but in one case we construct a - // PartialFailureError and send that back to the user instead. In this - // case, we still need to pass the originalError into the method - // because the PartialFailureError doesn't have a code, but we need to - // communicate a code to the metrics collector. - // - const code = originalError ? originalError.code : 0; - metricsCollector.onOperationComplete(code); - callback(err, apiResponse); - }; - const metricsCollector = this.bigtable._metricsConfigManager.createOperation( MethodName.MUTATE_ROWS, StreamingState.STREAMING, this, ); - /* - The following line of code sets the timeout if it was provided while - creating the client. This will be used to determine if the client should - retry on errors. Eventually, this will be handled downstream in google-gax. - */ - const timeout = - options?.gaxOptions?.timeout || - (this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && - this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ - 'google.bigtable.v2.Bigtable' - ]?.methods['MutateRows']?.timeout_millis); - const callTimeMillis = new Date().getTime(); - - let numRequestsMade = 0; - - const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3; - const pendingEntryIndices = new Set( - entries.map((entry: Entry, index: number) => index), - ); - const entryToIndex = new Map( - entries.map((entry: Entry, index: number) => [entry, index]), - ); - const mutationErrorsByEntryIndex = new Map(); - - const isRetryable = ( - err: ServiceError | null, - timeoutExceeded: boolean, - ) => { - if (timeoutExceeded) { - // If the timeout has been exceeded then do not retry. - return false; - } - // Don't retry if there are no more entries or retry attempts - if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { - return false; - } - // If the error is empty but there are still outstanding mutations, - // it means that there are retryable errors in the mutate response - // even when the RPC succeeded - return !err || RETRYABLE_STATUS_CODES.has(err.code); - }; - - const onBatchResponse = (err: ServiceError | null) => { - // Return if the error happened before a request was made - if (numRequestsMade === 0) { - collectMetricsCallback(err, err); - return; - } - - const timeoutExceeded = !!( - timeout && timeout < new Date().getTime() - callTimeMillis - ); - if (isRetryable(err, timeoutExceeded)) { - // If the timeout or max retries is exceeded or if there are no - // pending indices left then the client doesn't retry. - // Otherwise, the client will retry if there is no error or if the - // error has a retryable status code. - const backOffSettings = - options.gaxOptions?.retry?.backoffSettings || - DEFAULT_BACKOFF_SETTINGS; - const nextDelay = getNextDelay(numRequestsMade, backOffSettings); - metricsCollector.onAttemptComplete(err ? err.code : 0); - setTimeout(makeNextBatchRequest, nextDelay); - return; - } - - // If there's no more pending mutations, set the error - // to null - if (pendingEntryIndices.size === 0) { - err = null; - } - - const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); - if (mutationErrorsByEntryIndex.size !== 0) { - collectMetricsCallback( - err, - new PartialFailureError(mutationErrors, err), - ); - return; - } - if (err) { - /* If there's an RPC level failure and the mutation entries don't have - a status code, the RPC level failure error code will be used as the - entry failure code. - */ - (err as ServiceError & {errors?: ServiceError[]}).errors = - mutationErrors.concat( - [...pendingEntryIndices] - .filter(index => !mutationErrorsByEntryIndex.has(index)) - .map(() => err), - ); - collectMetricsCallback(err, err); - return; - } - collectMetricsCallback(null, null); - }; - - metricsCollector.onOperationStart(); - const makeNextBatchRequest = () => { - metricsCollector.onAttemptStart(); - const entryBatch = entries.filter((entry: Entry, index: number) => { - return pendingEntryIndices.has(index); - }); - - // If the viewName is provided then request will be made for an - // authorized view. Otherwise, the request is made for a table. - const baseReqOpts = ( - this.viewName - ? { - authorizedViewName: `${this.name}/authorizedViews/${this.viewName}`, - } - : { - tableName: this.name, - } - ) as google.bigtable.v2.IReadRowsRequest; - const reqOpts = Object.assign(baseReqOpts, { - appProfileId: this.bigtable.appProfileId, - entries: options.rawMutation - ? entryBatch - : entryBatch.map(Mutation.parse), - }); - - const retryOpts = { - currentRetryAttempt: numRequestsMade, - // Handling retries in this client. Specify the retry options to - // make sure nothing is retried in retry-request. - noResponseRetries: 0, - shouldRetryFn: (_: any) => { - return false; - }, - }; - - options.gaxOptions = populateAttemptHeader( - numRequestsMade, - options.gaxOptions, - ); - - const requestStream = - this.bigtable.request({ - client: 'BigtableClient', - method: 'mutateRows', - reqOpts, - gaxOpts: options.gaxOptions, - retryOpts, - }); - metricsCollector.wrapRequest(requestStream); - requestStream - .on('error', (err: ServiceError) => { - onBatchResponse(err); - }) - .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { - obj.entries!.forEach(entry => { - const originalEntry = entryBatch[entry.index as number]; - const originalEntriesIndex = entryToIndex.get(originalEntry)!; - - // Mutation was successful. - if (entry.status!.code === 0) { - pendingEntryIndices.delete(originalEntriesIndex); - mutationErrorsByEntryIndex.delete(originalEntriesIndex); - return; - } - if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { - pendingEntryIndices.delete(originalEntriesIndex); - } - const errorDetails = entry.status; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (errorDetails as any).entry = originalEntry; - mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); - }); - }) - .on('end', onBatchResponse); - numRequestsMade++; - }; - - makeNextBatchRequest(); + mutateInternal(this, metricsCollector, entriesRaw, optionsOrCallback, cb); } /** diff --git a/src/utils/createReadStreamInternal.ts b/src/utils/createReadStreamInternal.ts index 5da12a353..73d6f1f99 100644 --- a/src/utils/createReadStreamInternal.ts +++ b/src/utils/createReadStreamInternal.ts @@ -26,10 +26,6 @@ import { } from '../chunktransformer'; import {TableUtils} from './table'; import {Duplex, PassThrough, Transform} from 'stream'; -import { - MethodName, - StreamingState, -} from '../client-side-metrics/client-side-metrics-attributes'; import {google} from '../../protos/protos'; const pumpify = require('pumpify'); import {grpc, ServiceError} from 'google-gax'; diff --git a/src/utils/mutateInternal.ts b/src/utils/mutateInternal.ts new file mode 100644 index 000000000..f068543cf --- /dev/null +++ b/src/utils/mutateInternal.ts @@ -0,0 +1,239 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + Entry, + MutateCallback, + MutateOptions, + PartialFailureError, +} from '../table'; +import {OperationMetricsCollector} from '../client-side-metrics/operation-metrics-collector'; +import { + DEFAULT_BACKOFF_SETTINGS, + getNextDelay, + populateAttemptHeader, + RETRYABLE_STATUS_CODES, + TabularApiSurface, +} from '../tabular-api-surface'; +import {ServiceError} from 'google-gax'; +import {google} from '../../protos/protos'; +import * as is from 'is'; +import {Mutation} from '../mutation'; +import arrify = require('arrify'); + +export function mutateInternal( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entriesRaw: Entry | Entry[], + optionsOrCallback?: MutateOptions | MutateCallback, + cb?: MutateCallback, +) { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const entries: Entry[] = (arrify(entriesRaw) as Entry[]).reduce( + (a, b) => a.concat(b), + [], + ); + const collectMetricsCallback = ( + originalError: ServiceError | null, + err: ServiceError | PartialFailureError | null, + apiResponse?: google.protobuf.Empty, + ) => { + // originalError is the error that was sent from the gapic layer. The + // compiler guarantees that it contains a code which needs to be + // provided when an operation is marked complete. + // + // err is the error we intend to send back to the user. Often it is the + // same as originalError, but in one case we construct a + // PartialFailureError and send that back to the user instead. In this + // case, we still need to pass the originalError into the method + // because the PartialFailureError doesn't have a code, but we need to + // communicate a code to the metrics collector. + // + const code = originalError ? originalError.code : 0; + metricsCollector.onOperationComplete(code); + callback(err, apiResponse); + }; + + /* + The following line of code sets the timeout if it was provided while + creating the client. This will be used to determine if the client should + retry on errors. Eventually, this will be handled downstream in google-gax. + */ + const timeout = + options?.gaxOptions?.timeout || + (table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && + table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ + 'google.bigtable.v2.Bigtable' + ]?.methods['MutateRows']?.timeout_millis); + const callTimeMillis = new Date().getTime(); + + let numRequestsMade = 0; + + const maxRetries = is.number(table.maxRetries) ? table.maxRetries! : 3; + const pendingEntryIndices = new Set( + entries.map((entry: Entry, index: number) => index), + ); + const entryToIndex = new Map( + entries.map((entry: Entry, index: number) => [entry, index]), + ); + const mutationErrorsByEntryIndex = new Map(); + + const isRetryable = (err: ServiceError | null, timeoutExceeded: boolean) => { + if (timeoutExceeded) { + // If the timeout has been exceeded then do not retry. + return false; + } + // Don't retry if there are no more entries or retry attempts + if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { + return false; + } + // If the error is empty but there are still outstanding mutations, + // it means that there are retryable errors in the mutate response + // even when the RPC succeeded + return !err || RETRYABLE_STATUS_CODES.has(err.code); + }; + + const onBatchResponse = (err: ServiceError | null) => { + // Return if the error happened before a request was made + if (numRequestsMade === 0) { + collectMetricsCallback(err, err); + return; + } + + const timeoutExceeded = !!( + timeout && timeout < new Date().getTime() - callTimeMillis + ); + if (isRetryable(err, timeoutExceeded)) { + // If the timeout or max retries is exceeded or if there are no + // pending indices left then the client doesn't retry. + // Otherwise, the client will retry if there is no error or if the + // error has a retryable status code. + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; + const nextDelay = getNextDelay(numRequestsMade, backOffSettings); + metricsCollector.onAttemptComplete(err ? err.code : 0); + setTimeout(makeNextBatchRequest, nextDelay); + return; + } + + // If there's no more pending mutations, set the error + // to null + if (pendingEntryIndices.size === 0) { + err = null; + } + + const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); + if (mutationErrorsByEntryIndex.size !== 0) { + collectMetricsCallback(err, new PartialFailureError(mutationErrors, err)); + return; + } + if (err) { + /* If there's an RPC level failure and the mutation entries don't have + a status code, the RPC level failure error code will be used as the + entry failure code. + */ + (err as ServiceError & {errors?: ServiceError[]}).errors = + mutationErrors.concat( + [...pendingEntryIndices] + .filter(index => !mutationErrorsByEntryIndex.has(index)) + .map(() => err), + ); + collectMetricsCallback(err, err); + return; + } + collectMetricsCallback(null, null); + }; + + metricsCollector.onOperationStart(); + const makeNextBatchRequest = () => { + metricsCollector.onAttemptStart(); + const entryBatch = entries.filter((entry: Entry, index: number) => { + return pendingEntryIndices.has(index); + }); + + // If the viewName is provided then request will be made for an + // authorized view. Otherwise, the request is made for a table. + const baseReqOpts = ( + table.viewName + ? { + authorizedViewName: `${table.name}/authorizedViews/${table.viewName}`, + } + : { + tableName: table.name, + } + ) as google.bigtable.v2.IReadRowsRequest; + const reqOpts = Object.assign(baseReqOpts, { + appProfileId: table.bigtable.appProfileId, + entries: options.rawMutation + ? entryBatch + : entryBatch.map(Mutation.parse), + }); + + const retryOpts = { + currentRetryAttempt: numRequestsMade, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: (_: any) => { + return false; + }, + }; + + options.gaxOptions = populateAttemptHeader( + numRequestsMade, + options.gaxOptions, + ); + + const requestStream = + table.bigtable.request({ + client: 'BigtableClient', + method: 'mutateRows', + reqOpts, + gaxOpts: options.gaxOptions, + retryOpts, + }); + metricsCollector.wrapRequest(requestStream); + requestStream + .on('error', (err: ServiceError) => { + onBatchResponse(err); + }) + .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { + obj.entries!.forEach(entry => { + const originalEntry = entryBatch[entry.index as number]; + const originalEntriesIndex = entryToIndex.get(originalEntry)!; + + // Mutation was successful. + if (entry.status!.code === 0) { + pendingEntryIndices.delete(originalEntriesIndex); + mutationErrorsByEntryIndex.delete(originalEntriesIndex); + return; + } + if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { + pendingEntryIndices.delete(originalEntriesIndex); + } + const errorDetails = entry.status; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (errorDetails as any).entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); + }); + }) + .on('end', onBatchResponse); + numRequestsMade++; + }; + + makeNextBatchRequest(); +} diff --git a/system-test/client-side-metrics-all-methods.ts b/system-test/client-side-metrics-all-methods.ts index cb42909aa..8b64746a1 100644 --- a/system-test/client-side-metrics-all-methods.ts +++ b/system-test/client-side-metrics-all-methods.ts @@ -191,6 +191,18 @@ function checkMutateRowsCall( ); } +function checkMutateRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.MutateRow', + 'false', + ); +} + function checkSingleRowCall( projectId: string, requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], @@ -203,13 +215,15 @@ function checkSingleRowCall( ); } +const entry = { + cf1: { + column: 1, + }, +}; + const mutation = { key: 'rowId', - data: { - cf1: { - column: 1, - }, - }, + data: entry, method: Mutation.methods.INSERT, }; @@ -544,6 +558,91 @@ describe('Bigtable/ClientSideMetrics', () => { }); }); }); + describe('MutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a single point MutateRows call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + const row = table.row('gwashington'); + await row.save(entry); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a MutateRow call with a second project and a single point', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); }); describe('Bigtable/ClientSideMetricsToGCMTimeout', () => { // This test suite simulates a situation where the user creates multiple @@ -801,6 +900,111 @@ describe('Bigtable/ClientSideMetrics', () => { }); }); }); + describe('MutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a MutateRows call for a single point', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a single MutateRow call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); }); describe('Bigtable/ClientSideMetricsToMetricsHandler', () => { async function getFakeBigtableWithHandler( @@ -971,5 +1175,43 @@ describe('Bigtable/ClientSideMetrics', () => { }); }); }); + describe('MutateRow', () => { + it('should send the metrics to the metrics handler for a MutateRows call for a single point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly for a single mutateRow point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + })().catch(err => { + throw err; + }); + }); + }); }); }); diff --git a/test/row.ts b/test/row.ts index 9e4833239..8544107c2 100644 --- a/test/row.ts +++ b/test/row.ts @@ -25,6 +25,8 @@ import { GetRowsOptions, GetRowsCallback, GetRowsResponse, + MutateOptions, + MutateCallback, } from '../src/table.js'; import {Chunk} from '../src/chunktransformer.js'; import {CallOptions, ServiceError} from 'google-gax'; @@ -32,6 +34,8 @@ import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics import {Bigtable} from '../src/'; import {getRowsInternal} from '../src/utils/getRowsInternal'; import {TabularApiSurface} from '../src/tabular-api-surface'; +import * as pumpify from 'pumpify'; +import {OperationMetricsCollector} from '../src/client-side-metrics/operation-metrics-collector'; const sandbox = sinon.createSandbox(); @@ -88,6 +92,23 @@ describe('Bigtable/Row', () => { let RowError: typeof rw.RowError; let row: rw.Row; + function getFakeMutateRow( + fn: ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + callback: Function, + ) => void | Promise, + ) { + const Fake = proxyquire('../src/row.js', { + '../src/utils/mutateInternal': { + mutateInternal: fn, + }, + }); + return Fake; + } + function getFakeRow( getRowsInternal: ( table: TabularApiSurface, @@ -1513,34 +1534,52 @@ describe('Bigtable/Row', () => { }; it('should insert an object', done => { - (row.table.mutate as Function) = ( + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, // eslint-disable-next-line @typescript-eslint/no-explicit-any - entry: any, + entry: Entry | Entry[], gaxOptions: {}, callback: Function, ) => { assert.strictEqual(entry.data, data); callback(); // done() }; - row.save(data, done); + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, done); }); it('should accept gaxOptions', done => { const gaxOptions = {}; - sandbox.stub(row.table, 'mutate').callsFake((entry, gaxOptions_) => { + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + ) => { assert.strictEqual(gaxOptions_, gaxOptions); done(); - }); - row.save(data, gaxOptions, assert.ifError); + }; + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, gaxOptions, assert.ifError); }); it('should remove existing data', done => { const gaxOptions = {}; - sandbox.stub(row.table, 'mutate').callsFake((entry, gaxOptions_) => { + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + ) => { assert.strictEqual(gaxOptions_, gaxOptions); done(); - }); - row.save(data, gaxOptions, assert.ifError); + }; + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, gaxOptions, assert.ifError); assert.strictEqual(row.data, undefined); }); }); diff --git a/test/table.ts b/test/table.ts index b0eb41891..c10e89145 100644 --- a/test/table.ts +++ b/test/table.ts @@ -35,6 +35,7 @@ import {OperationMetricsCollector} from '../src/client-side-metrics/operation-me import {SinonSpy} from 'sinon'; import {TabularApiSurface} from '../src/tabular-api-surface'; import {GetRowsOptions} from '../src/table'; +import {mutateInternal} from '../src/utils/mutateInternal'; const sandbox = sinon.createSandbox(); const noop = () => {}; @@ -135,6 +136,13 @@ function getTableMock( createReadStreamInternal: createReadStreamInternal, }, }); + const FakeMutateInternal = proxyquire('../src/utils/mutateInternal.js', { + '../row.js': {Row: FakeRow}, + '../chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, + '../filter.js': {Filter: FakeFilter}, + '../mutation.js': {Mutation: FakeMutation}, + pumpify, + }).mutateInternal; const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { '@google-cloud/promisify': fakePromisify, './family.js': {Family: FakeFamily}, @@ -146,6 +154,9 @@ function getTableMock( './utils/createReadStreamInternal': { createReadStreamInternal, }, + './utils/mutateInternal': { + mutateInternal: FakeMutateInternal, + }, './utils/getRowsInternal': { getRowsInternal: FakeGetRows.getRowsInternal, },