-
Notifications
You must be signed in to change notification settings - Fork 62
feat: Add plumbing to support unary calls for client side metric collection #1631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1d648f0
ff27288
84a006a
4aa33df
39c26b0
edf122e
945c4c6
6b377b6
80d1b2d
b2e13ee
1df9aa3
8b33974
272ee4f
4905815
15be99e
42b8639
5550687
3d740c4
873c15e
8373ca0
d02d0c1
bb2cdcd
a054d37
59a3591
96ab36e
e2e0f97
2f513b0
eabbc0a
8ae93b0
5dd36f4
fa2046a
8282131
e56d82a
8682791
e107d61
dae67f0
cab25c5
45fc2f2
ea8b058
5d63c11
859b5c5
6a33923
5a92b25
a23b5d7
987d8a3
a3ff4f0
d4a68b2
7662205
a1012fc
cf08449
2befabc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| // Copyright 2025 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| import {CallOptions} from 'google-gax'; | ||
| import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector'; | ||
|
|
||
| // Mock Server Implementation | ||
| import * as grpcJs from '@grpc/grpc-js'; | ||
| import {status as GrpcStatus} from '@grpc/grpc-js'; | ||
|
|
||
| export type ServerStatus = { | ||
| metadata: {internalRepr: Map<string, Uint8Array[]>; options: {}}; | ||
| code: number; | ||
| details: string; | ||
| }; | ||
|
|
||
| // Helper to create interceptor provider for OperationMetricsCollector | ||
| function createMetricsInterceptorProvider( | ||
| collector: OperationMetricsCollector, | ||
| ) { | ||
| return (options: grpcJs.InterceptorOptions, nextCall: grpcJs.NextCall) => { | ||
| // savedReceiveMetadata and savedReceiveStatus are not strictly needed here anymore for the interceptor's own state | ||
| // OperationStart and AttemptStart will be called by the calling code (`fakeReadModifyWriteRow`) | ||
| return new grpcJs.InterceptingCall(nextCall(options), { | ||
| start: (metadata, listener, next) => { | ||
| // AttemptStart is called by the orchestrating code | ||
| const newListener: grpcJs.Listener = { | ||
| onReceiveMetadata: (metadata, nextMd) => { | ||
| collector.onMetadataReceived( | ||
| metadata as unknown as { | ||
| internalRepr: Map<string, string[]>; | ||
| options: {}; | ||
| }, | ||
| ); | ||
| nextMd(metadata); | ||
| }, | ||
| onReceiveStatus: (status, nextStat) => { | ||
| collector.onStatusMetadataReceived( | ||
| status as unknown as ServerStatus, | ||
| ); | ||
| nextStat(status); | ||
| }, | ||
| }; | ||
| next(metadata, newListener); | ||
| }, | ||
| }); | ||
| }; | ||
| } | ||
|
|
||
| export function withInterceptors( | ||
| gaxOptions: CallOptions, | ||
| metricsCollector?: OperationMetricsCollector, | ||
| ) { | ||
| if (metricsCollector) { | ||
| const interceptor = createMetricsInterceptorProvider(metricsCollector); | ||
| if (!gaxOptions.otherArgs) { | ||
| gaxOptions.otherArgs = {}; | ||
| } | ||
| if (!gaxOptions.otherArgs.options) { | ||
| gaxOptions.otherArgs.options = {}; | ||
| } | ||
| if (!gaxOptions.otherArgs.options.interceptors) { | ||
| gaxOptions.otherArgs.options.interceptors = [interceptor]; | ||
| } else { | ||
| if (Array.isArray(gaxOptions.otherArgs.options.interceptors)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this would scan better as an |
||
| // We check that interceptors is an array so that the code has no | ||
| // chance of throwing an error. | ||
| // Then, if the interceptors is an array, make sure it also includes the | ||
| // client side metrics interceptor. | ||
| gaxOptions.otherArgs.options.interceptors.push(interceptor); | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't you need to add the interceptor, even if other interceptors were added?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That really depends on how we want to design the system. Here are the pros and cons of adding the interceptor. Pros:
Cons:
Yeah, I guess the pros outweigh the cons so we can add this change.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I'm not sure I like this change anymore. If the interceptors are not an array (although they should be) then trying to push another interceptor onto interceptors will throw an error. I don't want users to experience that just in case they are using interceptors incorrectly. As you can see in the screenshot, the compiler doesn't guarantee any particular structure for the interceptors property.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make sure I'm understanding this right: with the current code, if the user passes in their own interceptors, they would effectively be disabling csm, right? What else could they be passing other than an array? There must be some docstring somewhere telling us what we need to support
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, unless they explicitly passed in the CSM interceptor.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this says "Additional arguments to be passed to the API calls." That's not helpful enough so I'll keep looking.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the code examples pass interceptors in as an array. If you feel strongly about adding it to the existing interceptors then we can make that change, but I just don't like how the compiler doesn't guarantee that it is an array. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we check the type of I really don't think we should silently disable CSM if the user passes in an interceptor. That would present as a bug, and it could be difficult to track down
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. I made this change. |
||
| } | ||
| return gaxOptions; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,294 @@ | ||
| // Copyright 2025 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| import {describe, it, before, after} from 'mocha'; | ||
| import {Bigtable} from '../src'; | ||
| import {ServiceError} from 'google-gax'; | ||
| import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; | ||
| import {TestMetricsHandler} from '../test-common/test-metrics-handler'; | ||
| import { | ||
| OnAttemptCompleteData, | ||
| OnOperationCompleteData, | ||
| } from '../src/client-side-metrics/metrics-handler'; | ||
| import {OperationMetricsCollector} from '../src/client-side-metrics/operation-metrics-collector'; | ||
| import { | ||
| MethodName, | ||
| StreamingState, | ||
| } from '../src/client-side-metrics/client-side-metrics-attributes'; | ||
| import * as assert from 'assert'; | ||
| import {status as GrpcStatus} from '@grpc/grpc-js'; | ||
| import {withInterceptors} from '../src/interceptor'; | ||
|
|
||
| const INSTANCE_ID = 'isolated-rmw-instance'; | ||
| const TABLE_ID = 'isolated-rmw-table'; | ||
| const ZONE = 'us-central1-a'; | ||
| const CLUSTER = 'fake-cluster'; | ||
| const COLUMN_FAMILY = 'traits'; | ||
| const COLUMN_FAMILIES = [COLUMN_FAMILY]; | ||
| const ROW_KEY = 'gwashington'; | ||
| const COLUMN = 'teeth'; | ||
|
|
||
| /** | ||
| * Creates a Bigtable instance if it does not already exist. | ||
| * | ||
| * @param bigtable - The Bigtable client. | ||
| * @param instanceId - The ID of the instance to create. | ||
| * @param clusterId - The ID of the initial cluster in the instance. | ||
| * @param locationId - The location (region) for the initial cluster. | ||
| * @returns The created instance object if successful, otherwise logs a message and returns the existing instance. | ||
| */ | ||
| async function createInstance( | ||
| bigtable: Bigtable, | ||
| instanceId: string, | ||
| clusterId: string, | ||
| locationId: string, | ||
| ) { | ||
| const instance = bigtable.instance(instanceId); | ||
|
|
||
| const [exists] = await instance.exists(); | ||
| if (exists) { | ||
| console.log(`Instance ${instanceId} already exists.`); | ||
| return instance; | ||
| } | ||
|
|
||
| const [i, operation] = await instance.create({ | ||
| clusters: [ | ||
| { | ||
| id: clusterId, | ||
| location: locationId, | ||
| nodes: 3, | ||
| }, | ||
| ], | ||
| labels: { | ||
| time_created: Date.now(), | ||
| }, | ||
| }); | ||
| await operation.promise(); | ||
| console.log(`Created instance ${instanceId}`); | ||
| return i; | ||
| } | ||
|
|
||
| /** | ||
| * Creates a Bigtable table if it does not already exist. | ||
| * | ||
| * @param bigtable - The Bigtable client. | ||
| * @param instanceId - The ID of the instance containing the table. | ||
| * @param tableId - The ID of the table to create. | ||
| * @param families - An array of column family names to create in the table. | ||
| * @returns A promise that resolves with the created Table object. | ||
| */ | ||
| async function createTable( | ||
| bigtable: Bigtable, | ||
| instanceId: string, | ||
| tableId: string, | ||
| families: string[], | ||
| ) { | ||
| const instance = bigtable.instance(instanceId); | ||
| const table = instance.table(tableId); | ||
|
|
||
| const [exists] = await table.exists(); | ||
| if (exists) { | ||
| console.log(`Table ${tableId} already exists.`); | ||
| return table; | ||
| } | ||
|
|
||
| const [t] = await table.create({ | ||
| families: families, | ||
| }); | ||
| const row = table.row(ROW_KEY); | ||
| await row.save({ | ||
| [COLUMN_FAMILY]: { | ||
| [COLUMN]: 'shiny', | ||
| }, | ||
| }); | ||
| console.log(`Created table ${tableId}`); | ||
| return t; | ||
| } | ||
|
|
||
| /** | ||
| * Creates and returns a TestMetricsHandler instance for testing purposes. | ||
| * | ||
| * @returns A TestMetricsHandler instance with the projectId set to 'test-project-id'. | ||
| */ | ||
| function getTestMetricsHandler() { | ||
| const testMetricsHandler = new TestMetricsHandler(); | ||
| testMetricsHandler.projectId = 'test-project-id'; | ||
| return testMetricsHandler; | ||
| } | ||
|
|
||
| /** | ||
| * Asynchronously retrieves the project ID associated with the Bigtable client. | ||
| * | ||
| * @param bigtable - The Bigtable client instance. | ||
| * @returns A promise that resolves with the project ID as a string. | ||
| * @throws An error if the project ID cannot be retrieved. | ||
| */ | ||
| async function getProjectIdFromClient(bigtable: Bigtable): Promise<string> { | ||
| return new Promise((resolve, reject) => { | ||
| bigtable.getProjectId_((err, projectId) => { | ||
| if (err) { | ||
| reject(err); | ||
| } else { | ||
| resolve(projectId!); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => { | ||
| let bigtable: Bigtable; | ||
| let testMetricsHandler: TestMetricsHandler; | ||
|
|
||
| before(async () => { | ||
| bigtable = new Bigtable(); | ||
| await getProjectIdFromClient(bigtable); | ||
| await createInstance(bigtable, INSTANCE_ID, CLUSTER, ZONE); | ||
| await createTable(bigtable, INSTANCE_ID, TABLE_ID, COLUMN_FAMILIES); | ||
| testMetricsHandler = getTestMetricsHandler(); | ||
| bigtable._metricsConfigManager = new ClientSideMetricsConfigManager([ | ||
| testMetricsHandler, | ||
| ]); | ||
| }); | ||
|
|
||
| after(async () => { | ||
| const instance = bigtable.instance(INSTANCE_ID); | ||
| await instance.delete(); | ||
| }); | ||
|
|
||
| it('should record and export correct metrics for ReadModifyWriteRow via interceptors', async () => { | ||
| const instance = bigtable.instance(INSTANCE_ID); | ||
|
|
||
| const table = instance.table(TABLE_ID); | ||
|
|
||
| /* | ||
| fakeReadModifyWriteRowMethod is just a fake method on a table that makes a | ||
| call to the readWriteModifyRow grpc endpoint. It demonstrates what a method | ||
| might look like when trying to make a unary call while extracting | ||
| information from the headers and trailers that the server returns so that | ||
| the extracted information can be recorded in client side metrics. | ||
| */ | ||
| (table as any).fakeReadModifyWriteRowMethod = async () => { | ||
| // 1. Create a metrics collector. | ||
| const metricsCollector = new OperationMetricsCollector( | ||
| table, | ||
| MethodName.READ_MODIFY_WRITE_ROW, | ||
| StreamingState.UNARY, | ||
| (table as any).bigtable._metricsConfigManager!.metricsHandlers, | ||
| ); | ||
| // 2. Tell the metrics collector an attempt has been started. | ||
| metricsCollector.onOperationStart(); | ||
| metricsCollector.onAttemptStart(); | ||
| // 3. Make a unary call with gax options that include interceptors. The | ||
| // interceptors are built from a method that hooks them up to the | ||
| // metrics collector | ||
| const responseArray = await new Promise((resolve, reject) => { | ||
| bigtable.request( | ||
| { | ||
| client: 'BigtableClient', | ||
| method: 'readModifyWriteRow', | ||
| reqOpts: { | ||
| tableName: table.name, | ||
| rowKey: Buffer.from(ROW_KEY), | ||
| rules: [ | ||
| { | ||
| familyName: COLUMN_FAMILY, | ||
| columnQualifier: Buffer.from(COLUMN), | ||
| appendValue: Buffer.from('-wood'), | ||
| }, | ||
| ], | ||
| appProfileId: undefined, | ||
| }, | ||
| gaxOpts: withInterceptors({}, metricsCollector), | ||
| }, | ||
| (err: ServiceError | null, resp?: any) => { | ||
| if (err) { | ||
| reject(err); | ||
| } else { | ||
| resolve(resp); | ||
| } | ||
| }, | ||
| ); | ||
| }); | ||
| // 4. Tell the metrics collector the attempt is over | ||
| metricsCollector.onAttemptComplete(GrpcStatus.OK); | ||
| metricsCollector.onOperationComplete(GrpcStatus.OK); | ||
| // 5. Return results of method call to the user | ||
| return responseArray; | ||
| }; | ||
|
|
||
| await (table as any).fakeReadModifyWriteRowMethod(); | ||
|
|
||
| assert.strictEqual(testMetricsHandler.requestsHandled.length, 2); | ||
|
|
||
| const attemptCompleteData = testMetricsHandler.requestsHandled.find( | ||
| m => (m as {attemptLatency?: number}).attemptLatency !== undefined, | ||
| ) as OnAttemptCompleteData | undefined; | ||
| const operationCompleteData = testMetricsHandler.requestsHandled.find( | ||
| m => (m as {operationLatency?: number}).operationLatency !== undefined, | ||
| ) as OnOperationCompleteData | undefined; | ||
|
|
||
| assert.ok(attemptCompleteData, 'OnAttemptCompleteData should be present'); | ||
| assert.ok( | ||
| operationCompleteData, | ||
| 'OnOperationCompleteData should be present', | ||
| ); | ||
| if (!attemptCompleteData || !operationCompleteData) { | ||
| throw new Error('Metrics data is missing'); // Should be caught by asserts above | ||
| } | ||
| assert.strictEqual( | ||
| attemptCompleteData.metricsCollectorData.method, | ||
| MethodName.READ_MODIFY_WRITE_ROW, | ||
| ); | ||
| assert.strictEqual(attemptCompleteData.status, '0'); | ||
| assert.strictEqual( | ||
| attemptCompleteData.metricsCollectorData.table, | ||
| TABLE_ID, | ||
| ); | ||
| assert.strictEqual( | ||
| attemptCompleteData.metricsCollectorData.instanceId, | ||
| INSTANCE_ID, | ||
| ); | ||
| assert.ok(attemptCompleteData.attemptLatency >= 0); | ||
| assert(attemptCompleteData.serverLatency); | ||
| assert.ok(attemptCompleteData.serverLatency >= 0); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where we prove we are getting the server latency from the metadata headers. |
||
| assert.strictEqual(attemptCompleteData.metricsCollectorData.zone, ZONE); | ||
| assert.strictEqual( | ||
| attemptCompleteData.metricsCollectorData.cluster, | ||
| CLUSTER, | ||
| ); | ||
| assert.strictEqual(attemptCompleteData.streaming, StreamingState.UNARY); | ||
|
|
||
| assert.strictEqual( | ||
| operationCompleteData.metricsCollectorData.method, | ||
| MethodName.READ_MODIFY_WRITE_ROW, | ||
| ); | ||
| assert.strictEqual(operationCompleteData.status, '0'); | ||
| assert.strictEqual( | ||
| operationCompleteData.metricsCollectorData.table, | ||
| TABLE_ID, | ||
| ); | ||
| assert.strictEqual( | ||
| operationCompleteData.metricsCollectorData.instanceId, | ||
| INSTANCE_ID, | ||
| ); | ||
| assert.ok(operationCompleteData.operationLatency >= 0); | ||
| assert.strictEqual(operationCompleteData.retryCount, 0); | ||
| assert.strictEqual(operationCompleteData.metricsCollectorData.zone, ZONE); | ||
| assert.strictEqual( | ||
| operationCompleteData.metricsCollectorData.cluster, | ||
| CLUSTER, | ||
| ); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where we prove we are getting the Zone/Cluster from the trailer. |
||
| assert.strictEqual(operationCompleteData.streaming, StreamingState.UNARY); | ||
| }); | ||
| }); | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you show me how you expect to use this? Does something like this need to be called at the start of each request?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a method call we write something like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the tests below. The interceptors are added onto gax options.