Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fe5b621
Add metrics collector hooks in the right places
danieljbruce Jul 9, 2025
f74cee8
Move readrows tests over
danieljbruce Jul 9, 2025
7b45ceb
Group ReadRows under separate describe block
danieljbruce Jul 9, 2025
bf9a52f
Add mutateRows tests
danieljbruce Jul 9, 2025
20167af
Add onResponse to mutateRows collection
danieljbruce Jul 9, 2025
21f96e9
Eliminate the extra mutateRows calls
danieljbruce Jul 10, 2025
689efad
Change the test frame to work without inserting
danieljbruce Jul 10, 2025
9d40bf7
Remove console traces
danieljbruce Jul 10, 2025
2bb1a59
Remove the error console log
danieljbruce Jul 10, 2025
079a607
Inserts will conflate results for readRows too
danieljbruce Jul 10, 2025
168e132
Remove only
danieljbruce Jul 10, 2025
cdbdc20
Remove only
danieljbruce Jul 10, 2025
ee12cea
Merge branch 'main' into 359913994-mutate-rows
danieljbruce Jul 10, 2025
d56ddf3
Remove the extra onResponse call
danieljbruce Jul 10, 2025
9b6ed02
Include onOperationComplete in the callback
danieljbruce Jul 14, 2025
bc70978
Remove the onOperationComplete call
danieljbruce Jul 14, 2025
76e50e7
Merge branch 'main' into 359913994-mutate-rows
danieljbruce Jul 15, 2025
5022a7a
Get rid of error code fragment
danieljbruce Jul 18, 2025
9aa6ce1
onResponse handler moved into metrics collector
danieljbruce Jul 18, 2025
04e4e77
Rename handleStatusAndMetadata
danieljbruce Jul 21, 2025
524eb47
Add comments, shorten snippet
danieljbruce Jul 21, 2025
285651f
Add the wrapRequest method to the mock
danieljbruce Jul 21, 2025
42366c9
Pass null along instead
danieljbruce Jul 21, 2025
1514d0c
Add retries comment
danieljbruce Jul 21, 2025
c1f273f
Merge branch 'main' into 359913994-mutate-rows
danieljbruce Jul 28, 2025
3c05e26
Use the same setup table code as before
danieljbruce Jul 28, 2025
e4d2c28
Rename method to setupBigtableWithInsert
danieljbruce Jul 28, 2025
33f449a
Keep setupBigtable name as setupBigtable
danieljbruce Jul 28, 2025
926c480
Eliminate unused import
danieljbruce Jul 28, 2025
a82647b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 28, 2025
4d74ab4
Merge branch '359913994-mutate-rows' of https://github.com/googleapis…
gcf-owl-bot[bot] Jul 28, 2025
a0de09b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 28, 2025
bbea829
Merge branch '359913994-mutate-rows' of https://github.com/googleapis…
gcf-owl-bot[bot] Jul 28, 2025
9b13fba
Run the linter
danieljbruce Jul 28, 2025
f83727b
Merge branch '359913994-mutate-rows' of https://github.com/googleapis…
danieljbruce Jul 28, 2025
1594239
Make sure the table is set up properly, but don’t
danieljbruce Jul 28, 2025
eb3fefe
Remove only
danieljbruce Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/client-side-metrics/operation-metrics-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export class OperationMetricsCollector {
*
* @param stream
*/
handleStatusAndMetadata(stream: AbortableDuplex) {
wrapRequest(stream: AbortableDuplex) {
stream
.on(
'metadata',
Expand All @@ -183,7 +183,10 @@ export class OperationMetricsCollector {
}) => {
this.onStatusMetadataReceived(status);
},
);
)
.on('data', () => {
this.onResponse();
});
}

/**
Expand Down
52 changes: 45 additions & 7 deletions src/tabular-api-surface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,33 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
(a, b) => a.concat(b),
[],
);
const collectMetricsCallback = (
originalError: ServiceError | null,
err: ServiceError | PartialFailureError | null,
apiResponse?: google.protobuf.Empty,
) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment describing the difference between originalError and err?

The refactor looks better. But I don't completely understand that part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I added comment fully explaining this.

// originalError is the error that was sent from the gapic layer. The
// compiler guarantees that it contains a code which needs to be
// provided when an operation is marked complete.
//
// err is the error we intend to send back to the user. Often it is the
// same as originalError, but in one case we construct a
// PartialFailureError and send that back to the user instead. In this
// case, we still need to pass the originalError into the method
// because the PartialFailureError doesn't have a code, but we need to
// communicate a code to the metrics collector.
//
const code = originalError ? originalError.code : 0;
metricsCollector.onOperationComplete(code);
callback(err, apiResponse);
};

const metricsCollector =
this.bigtable._metricsConfigManager.createOperation(
MethodName.MUTATE_ROWS,
StreamingState.STREAMING,
this,
);
/*
The following line of code sets the timeout if it was provided while
creating the client. This will be used to determine if the client should
Expand Down Expand Up @@ -387,18 +413,23 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const onBatchResponse = (err: ServiceError | null) => {
// Return if the error happened before a request was made
if (numRequestsMade === 0) {
callback(err);
collectMetricsCallback(err, err);
return;
}

const timeoutExceeded = !!(
timeout && timeout < new Date().getTime() - callTimeMillis
);
if (isRetryable(err, timeoutExceeded)) {
// If the timeout or max retries is exceeded or if there are no
// pending indices left then the client doesn't retry.
// Otherwise, the client will retry if there is no error or if the
// error has a retryable status code.
const backOffSettings =
options.gaxOptions?.retry?.backoffSettings ||
DEFAULT_BACKOFF_SETTINGS;
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
metricsCollector.onAttemptComplete(err ? err.code : 0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should know that err exists at this point, right? It seems like error code 0 should never happen

If we need a null check, I'd say we should put it beside isRetryable(err). But I assume that's already part of isRetryable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should know that err exists at this point, right?

No. In fact if the error doesn't exist then most of the time this if block will be entered because isRetryable will be true. See the comment:

// If the error is empty but there are still outstanding mutations,
// it means that there are retryable errors in the mutate response
// even when the RPC succeeded
return !err || RETRYABLE_STATUS_CODES.has(err.code);
image

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I know this is out of scope of this PR, but some comments here would be very useful. A docstring for isRetryable, and/or a comment at the start of this block describing what makes it retryable would help a lot

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more important question though is: Do we record the attempt as successful if some mutations failed? or do we pass through the error code of a failed mutation?

In Python, it looks like I was attaching the status code of the exception at the top of the error list. I can't remember if that came from the product team or not though. Have you discussed this with Mattie? Or looked at Java? We should verify this before moving forward

Copy link
Contributor Author

@danieljbruce danieljbruce Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gemini says we record OK for PartialFailureError results in java. I'll verify that in the code next and ask Bigtable, but I am not too familiar with that codebase so may take time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we send back the OK code here so for the time being I'll change this to pass in code 0.

@mutianf In case of a PartialFailureError, what error code do we want to record for client side metrics on a completed attempt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just to provide more background, I verified that stream ending (no error) corresponds to the case where we would expect a PartialFailureError this test. So in this case, if we are inside this if block and the err is null then we can conclude that we are retrying because some of the indicies are pending.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with Mattie, she said Java records a status of success when a mutation fails, but the rpc succeeds. So we can do the same here

setTimeout(makeNextBatchRequest, nextDelay);
return;
}
Expand All @@ -411,7 +442,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
if (mutationErrorsByEntryIndex.size !== 0) {
callback(new PartialFailureError(mutationErrors, err));
collectMetricsCallback(
err,
new PartialFailureError(mutationErrors, err),
);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also seems weird to me to expect a null error here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This err ? err.code : 0 code has now been moved inside collectMetricsCallback anyways to reduce the repetition.

return;
}
if (err) {
Expand All @@ -425,13 +459,15 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
.filter(index => !mutationErrorsByEntryIndex.has(index))
.map(() => err),
);
callback(err);
collectMetricsCallback(err, err);
return;
}
callback(err);
collectMetricsCallback(null, null);
};

metricsCollector.onOperationStart();
const makeNextBatchRequest = () => {
metricsCollector.onAttemptStart();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these scattered onAttemptStart and onAttemptComplete lines make me a bit nervous. Especially since we have to repeat them across rpcs. Is there a way to encapsulate these better?

Maybe wrap the stream object itself? And wrap the callbacks?

Let me know if this makes sense to you. I can try to come up with some ideas if needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some ideas:

  1. We could make the first call to onAttemptStart automatically call onOperationStart right away.
  2. I like the idea of wrapping the callbacks so that it calls onOperationComplete just before we call the callback. I think that would clean things up.
  3. We could consider implicitly calling onAttemptComplete at the start of onAttemptStart to implicitly end the last attempt, but then I think the attempt latency would be off because it would include the retry delay so maybe this is not such a great option.
  4. All calls flow through the request method. We could consider allowing request to accept hooks that get called when an attempt starts or when an attempt ends, but I'm not sure I like this idea. It makes this method even more complex and this is a public facing method. Even though users shouldn't really be using this method directly, it would still change a public API surface. At most I would consider taking this as a backlog item and doing it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only ended up adopting change #2. I think #1 has the downside that we lose flexibility and we only save 1 line of code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, those changes look a lot better. I think there are still improvements to be had, but this is a great start

const entryBatch = entries.filter((entry: Entry, index: number) => {
return pendingEntryIndices.has(index);
});
Expand Down Expand Up @@ -469,14 +505,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
options.gaxOptions,
);

this.bigtable
.request<google.bigtable.v2.MutateRowsResponse>({
const requestStream =
this.bigtable.request<google.bigtable.v2.MutateRowsResponse>({
client: 'BigtableClient',
method: 'mutateRows',
reqOpts,
gaxOpts: options.gaxOptions,
retryOpts,
})
});
metricsCollector.wrapRequest(requestStream);
requestStream
.on('error', (err: ServiceError) => {
onBatchResponse(err);
})
Expand Down
3 changes: 1 addition & 2 deletions src/utils/createReadStreamInternal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ export function createReadStreamInternal(

rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);

metricsCollector.handleStatusAndMetadata(requestStream);
metricsCollector.wrapRequest(requestStream);
rowStream
.on('error', (error: ServiceError) => {
rowStreamUnpipe(rowStream, userStream);
Expand Down Expand Up @@ -413,7 +413,6 @@ export function createReadStreamInternal(
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
metricsCollector.onResponse();
})
.on('end', () => {
activeRequestStream = null;
Expand Down
Loading
Loading