Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
87d54c0
Integrate CSM into readModifyWriteRow
danieljbruce Jul 28, 2025
26822d8
Add onOperationAndAttemptComplete method
danieljbruce Jul 29, 2025
66569cb
Fix Authorized view tests
danieljbruce Jul 29, 2025
150cd92
Merge branch 'main' of https://github.com/googleapis/nodejs-bigtable …
danieljbruce Jul 30, 2025
febccf7
Change test runs
danieljbruce Jul 30, 2025
85e3949
Add some readModifyWriteRow tests
danieljbruce Jul 30, 2025
9b7d9dd
Fix the column family error
danieljbruce Jul 30, 2025
3502229
Add handler tests for readModifyWriteRow
danieljbruce Jul 30, 2025
e5bbd03
Use handler only for readModifyWriteRow
danieljbruce Jul 30, 2025
e3dda80
Use handler for RMWR only
danieljbruce Jul 30, 2025
14a775e
Revert firstResponse latency changes
danieljbruce Jul 30, 2025
74dbce6
Need onOperationAndAttemptComplete in mutateRows
danieljbruce Jul 30, 2025
4b9fe1f
Move the handle to only for readRows
danieljbruce Jul 30, 2025
f54e794
Remove onlys
danieljbruce Jul 30, 2025
9b2c50c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 30, 2025
ba7a145
Don’t skip TimedStream tests
danieljbruce Jul 30, 2025
7bf964e
Merge branch '359913994-readModifyWriteRow' of https://github.com/goo…
danieljbruce Jul 30, 2025
d69497b
First response latencies for readRows only
danieljbruce Jul 30, 2025
bcb4cba
Remove only
danieljbruce Jul 30, 2025
21b37fd
call onAttempt start removed
danieljbruce Jul 30, 2025
84c5c5a
Do the check in the metrics handler instead
danieljbruce Jul 31, 2025
262e8c1
Added a comment
danieljbruce Jul 31, 2025
278d3c5
fix the tests to always expect a value for
danieljbruce Jul 31, 2025
c949dca
Fix assertion checks
danieljbruce Jul 31, 2025
5f0d3ae
withMetricInterceptors rename
danieljbruce Jul 31, 2025
a2a79f7
Move interceptor file
danieljbruce Jul 31, 2025
2ae3f0a
Add err argument
danieljbruce Jul 31, 2025
cd553b2
Inline application latency
danieljbruce Jul 31, 2025
0be9ba7
Merge branch 'first-response-latency-readrows-only' of https://github…
danieljbruce Jul 31, 2025
f25711c
Use withMetricInterceptors
danieljbruce Jul 31, 2025
f6e285d
Only have an onOperationComplete method
danieljbruce Aug 1, 2025
01853f9
Merge branch 'main' of https://github.com/googleapis/nodejs-bigtable …
danieljbruce Aug 1, 2025
52a6ce7
delete handler
danieljbruce Aug 1, 2025
775fb0c
Reintroduce the parameter documentation
danieljbruce Aug 1, 2025
c18a303
Merge branch 'main' into 359913994-readModifyWriteRow
danieljbruce Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/client-side-metrics/gcp-metrics-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
} = require('@opentelemetry/sdk-metrics');
import * as os from 'os';
import * as crypto from 'crypto';
import {MethodName} from './client-side-metrics-attributes';

/**
* Generates a unique client identifier string.
Expand Down Expand Up @@ -251,10 +252,16 @@ export class GCPMetricsHandler implements IMetricsHandler {
status: data.status,
...commonAttributes,
});
otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, {
status: data.status,
...commonAttributes,
});
if (
data.metricsCollectorData.method === MethodName.READ_ROWS ||
data.metricsCollectorData.method === MethodName.READ_ROW
) {
otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, {
status: data.status,
...commonAttributes,
});
}

if (data.applicationLatency) {
otelInstruments.applicationBlockingLatencies.record(
data.applicationLatency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

import {CallOptions} from 'google-gax';
import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector';
import {OperationMetricsCollector} from './operation-metrics-collector';

// Mock Server Implementation
import * as grpcJs from '@grpc/grpc-js';
import {status as GrpcStatus} from '@grpc/grpc-js';

Check warning on line 20 in src/client-side-metrics/metric-interceptor.ts

View workflow job for this annotation

GitHub Actions / lint

'GrpcStatus' is defined but never used

export type ServerStatus = {
metadata: {internalRepr: Map<string, Uint8Array[]>; options: {}};
Expand Down Expand Up @@ -49,16 +49,21 @@
collector.onStatusMetadataReceived(
status as unknown as ServerStatus,
);
collector.onAttemptComplete(status.code);
nextStat(status);
},
};
next(metadata, newListener);
},
sendMessage: function (message, next) {
collector.onAttemptStart();
next(message);
},
});
};
}

export function withInterceptors(
export function withMetricInterceptors(
gaxOptions: CallOptions,
metricsCollector?: OperationMetricsCollector,
) {
Expand Down
21 changes: 15 additions & 6 deletions src/client-side-metrics/operation-metrics-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,7 @@ export class OperationMetricsCollector {
}) => {
this.onStatusMetadataReceived(status);
},
)
.on('data', () => {
this.onResponse();
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only want to collect firstResponseLatency for readRows now so this is no longer needed.

);
}

/**
Expand Down Expand Up @@ -285,11 +282,23 @@ export class OperationMetricsCollector {
* @param {grpc.status} finalOperationStatus Information about the completed operation.
* @param {number} applicationLatency The application latency measurement.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Thanks for catching this.

*/
onOperationComplete(
onOperationAndAttemptComplete(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just make this implicit:

  onOperationComplete(
    finalOperationStatus: grpc.status,
    applicationLatency?: number,
  ) {
    withMetricsDebug(() => {
      if (this.state ==   OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET || this.state == OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED) {
          // finalize any active attempts
          this.onAttemptComplete(finalOperationStatus);
      }
      checkState(this.state, [
        MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS,
      ]);

Creating an extra method feels a bit unnecessary, because this seems like the reasonable thing for onOperationComplete to do when if it were to be called in this state anyway. But I'm not too picky about how this is implemented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation we need onOperationComplete for unary calls and onOperationAndAttemptComplete for other calls because for unary calls onAttemptComplete is called in the interceptor. If we find a way so that only onOperationAndAttemptComplete functionality is necessary then I'm happy to hear it, but this change is contingent on how we handle attempt start / completion in the interceptor.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you must have misunderstood what I'm suggesting

Before this PR, you have to finalize all attempts before you call onOperationComplete. if you call onOperationComplete() when there's an active attempt, it raises an error.

To address this, you added the new onOperationAndAttemptComplete(), which does work when there's an active attempt, but fails if all attempts were already completed.

With this change, we have two distinct methods for ending an operation. You have to choose the right one depending on the state you're in. If you choose the wrong one for the state, you get an error. There's no overlap

I'm just suggesting to merge these two, into a single onOperationComplete() that works in either state. To accomplish this, we just need to add adding that extra state check to the top of the method. That way, that single onOperationComplete makes the two redundant, and will "just work" no matter what the attempt state is

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I see what you are saying.

So we just have onOperationComplete and when onOperationComplete is called:

  1. If the attempt has not been marked as complete then call onAttemptComplete
  2. If the attempt has been marked complete already with an onAttemptComplete call then do not call onAttemptComplete again and just continue as usual with the rest of the onOperationComplete code

Is that what you are saying? Seems fine to me.

finalOperationStatus: grpc.status,
applicationLatency?: number,
) {
this.onAttemptComplete(finalOperationStatus);
this.onOperationComplete(finalOperationStatus, applicationLatency);
}

/**
* Called when an operation completes (successfully or unsuccessfully).
* Records operation latencies, retry counts, and connectivity error counts.
* @param {grpc.status} finalOperationStatus Information about the completed operation.
*/
onOperationComplete(
finalOperationStatus: grpc.status,
applicationLatency?: number,
) {
withMetricsDebug(() => {
checkState(this.state, [
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS,
Expand All @@ -310,7 +319,7 @@ export class OperationMetricsCollector {
client_name: `nodejs-bigtable/${version}`,
operationLatency: totalMilliseconds,
retryCount: this.attemptCount - 1,
firstResponseLatency: this.firstResponseLatency ?? undefined,
firstResponseLatency: this.firstResponseLatency ?? 0,
applicationLatency: applicationLatency ?? 0,
});
}
Expand Down
28 changes: 26 additions & 2 deletions src/row-data-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector';

const dotProp = require('dot-prop');
import {Filter, RawFilter} from './filter';
import {
Expand All @@ -30,6 +32,11 @@ import {TabularApiSurface} from './tabular-api-surface';
import arrify = require('arrify');
import {Bigtable} from './index';
import {CallOptions} from 'google-gax';
import {
MethodName,
StreamingState,
} from './client-side-metrics/client-side-metrics-attributes';
import {withMetricInterceptors} from './client-side-metrics/metric-interceptor';

interface TabularApiSurfaceRequest {
tableName?: string;
Expand Down Expand Up @@ -193,14 +200,31 @@ class RowDataUtils {
properties.reqOpts,
);
properties.requestData.data = {};
// 1. Create a metrics collector.
const metricsCollector = new OperationMetricsCollector(
properties.requestData.table,
MethodName.READ_MODIFY_WRITE_ROW,
StreamingState.UNARY,
(
properties.requestData.table as any
).bigtable._metricsConfigManager!.metricsHandlers,
);
// 2. Tell the metrics collector an attempt has been started.
metricsCollector.onOperationStart();
// 3. Make a unary call with gax options that include interceptors. The
// interceptors are built from a method that hooks them up to the
// metrics collector
properties.requestData.bigtable.request<google.bigtable.v2.IReadModifyWriteRowResponse>(
{
client: 'BigtableClient',
method: 'readModifyWriteRow',
reqOpts,
gaxOpts: gaxOptions,
gaxOpts: withMetricInterceptors(gaxOptions, metricsCollector),
},
(err, ...args) => {
metricsCollector.onOperationComplete(err ? err.code : 0);
callback(err, ...args);
},
callback,
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/tabular-api-surface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
// communicate a code to the metrics collector.
//
const code = originalError ? originalError.code : 0;
metricsCollector.onOperationComplete(code);
metricsCollector.onOperationAndAttemptComplete(code);
callback(err, apiResponse);
};

Expand Down
18 changes: 10 additions & 8 deletions src/utils/createReadStreamInternal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import {
} from '../chunktransformer';
import {TableUtils} from './table';
import {Duplex, PassThrough, Transform} from 'stream';
import {
MethodName,
StreamingState,
} from '../client-side-metrics/client-side-metrics-attributes';
import {google} from '../../protos/protos';
const pumpify = require('pumpify');
import {grpc, ServiceError} from 'google-gax';
Expand Down Expand Up @@ -324,6 +320,10 @@ export function createReadStreamInternal(
gaxOpts,
retryOpts,
});
requestStream.on('data', () => {
// This handler is necessary for recording firstResponseLatencies.
metricsCollector.onResponse();
});

activeRequestStream = requestStream!;

Expand Down Expand Up @@ -363,6 +363,9 @@ export function createReadStreamInternal(
rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);

metricsCollector.wrapRequest(requestStream);
requestStream.on('data', () => {
metricsCollector.onResponse();
});
Copy link

@daniel-sanche daniel-sanche Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this conflict with #1658? (It seems like the same change, but different location.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, different location. Perhaps, I should git pull those changes again into here before the next review.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it might be duplicated now

rowStream
.on('error', (error: ServiceError) => {
rowStreamUnpipe(rowStream, userStream);
Expand All @@ -371,7 +374,7 @@ export function createReadStreamInternal(
// We ignore the `cancelled` "error", since we are the ones who cause
// it when the user calls `.abort()`.
userStream.end();
metricsCollector.onOperationComplete(
metricsCollector.onOperationAndAttemptComplete(
error.code,
userStream.getTotalDurationMs(),
);
Expand Down Expand Up @@ -406,7 +409,7 @@ export function createReadStreamInternal(
//
error.code = grpc.status.CANCELLED;
}
metricsCollector.onOperationComplete(
metricsCollector.onOperationAndAttemptComplete(
error.code,
userStream.getTotalDurationMs(),
);
Expand All @@ -420,8 +423,7 @@ export function createReadStreamInternal(
})
.on('end', () => {
activeRequestStream = null;
const applicationLatency = userStream.getTotalDurationMs();
metricsCollector.onOperationComplete(
metricsCollector.onOperationAndAttemptComplete(
grpc.status.OK,
userStream.getTotalDurationMs(),
);
Expand Down
Loading
Loading