Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Patching Mechanism with AWS SDK telemetry improvements #13

Merged
merged 11 commits into from
Aug 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
"prewatch": "npm run precompile",
"prepublishOnly": "npm run compile",
"tdd": "yarn test -- --watch-extensions ts --watch",
"test": "nyc ts-mocha --timeout 10000 -p tsconfig.json 'test/**/*.ts'",
"test": "nyc ts-mocha --timeout 10000 -p tsconfig.json --require '@opentelemetry/contrib-test-utils' 'test/**/*.ts'",
"watch": "tsc -w"
},
"bugs": {
"url": "https://github.com/aws-observability/aws-otel-js-instrumentation/issues"
},
"devDependencies": {
"@aws-sdk/client-kinesis": "3.85.0",
"@aws-sdk/client-s3": "3.85.0",
"@aws-sdk/client-sqs": "3.85.0",
"@opentelemetry/contrib-test-utils": "^0.40.0",
"@types/mocha": "7.0.2",
"@types/node": "18.6.5",
"@types/sinon": "10.0.18",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { SEMATTRS_AWS_DYNAMODB_TABLE_NAMES } from '@opentelemetry/semantic-conventions';

// Utility class holding attribute keys with special meaning to AWS components
export const AWS_ATTRIBUTE_KEYS: { [key: string]: string } = {
AWS_SPAN_KIND: 'aws.span.kind',
Expand All @@ -19,14 +21,10 @@ export const AWS_ATTRIBUTE_KEYS: { [key: string]: string } = {
// Used for JavaScript workaround - attribute for pre-calculated value of isLocalRoot
AWS_IS_LOCAL_ROOT: 'aws.is.local.root',

// Divergence from Java/Python
// TODO: Audit this: These will most definitely be different in JavaScript.
// For example:
// - `messaging.url` for AWS_QUEUE_URL
// - `aws.dynamodb.table_names` for AWS_TABLE_NAME
AWS_BUCKET_NAME: 'aws.bucket.name',
AWS_QUEUE_URL: 'aws.queue.url',
AWS_QUEUE_NAME: 'aws.queue.name',
AWS_STREAM_NAME: 'aws.stream.name',
AWS_TABLE_NAME: 'aws.table.name',
// Naming divergence from Java/Python
thpierce marked this conversation as resolved.
Show resolved Hide resolved
AWS_S3_BUCKET: 'aws.s3.bucket',
AWS_SQS_QUEUE_URL: 'aws.sqs.queue.url',
AWS_SQS_QUEUE_NAME: 'aws.sqs.queue.name',
AWS_KINESIS_STREAM_NAME: 'aws.kinesis.stream.name',
AWS_DYNAMODB_TABLE_NAMES: SEMATTRS_AWS_DYNAMODB_TABLE_NAMES,
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import { Attributes, AttributeValue, diag, SpanKind } from '@opentelemetry/api';
import { Resource, defaultServiceName } from '@opentelemetry/resources';
import { defaultServiceName, Resource } from '@opentelemetry/resources';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import {
SEMATTRS_DB_CONNECTION_STRING,
Expand Down Expand Up @@ -340,30 +340,33 @@ export class AwsMetricAttributeGenerator implements MetricAttributeGenerator {
let remoteResourceIdentifier: AttributeValue | undefined;

if (AwsSpanProcessingUtil.isAwsSDKSpan(span)) {
if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_TABLE_NAME)) {
const awsTableNames: AttributeValue | undefined = span.attributes[AWS_ATTRIBUTE_KEYS.AWS_DYNAMODB_TABLE_NAMES];
if (
AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_DYNAMODB_TABLE_NAMES) &&
Array.isArray(awsTableNames) &&
awsTableNames.length === 1
) {
remoteResourceType = NORMALIZED_DYNAMO_DB_SERVICE_NAME + '::Table';
remoteResourceIdentifier = AwsMetricAttributeGenerator.escapeDelimiters(
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_TABLE_NAME]
);
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_STREAM_NAME)) {
remoteResourceIdentifier = AwsMetricAttributeGenerator.escapeDelimiters(awsTableNames[0]);
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_KINESIS_STREAM_NAME)) {
remoteResourceType = NORMALIZED_KINESIS_SERVICE_NAME + '::Stream';
remoteResourceIdentifier = AwsMetricAttributeGenerator.escapeDelimiters(
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_STREAM_NAME]
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_KINESIS_STREAM_NAME]
);
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_BUCKET_NAME)) {
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_S3_BUCKET)) {
remoteResourceType = NORMALIZED_S3_SERVICE_NAME + '::Bucket';
remoteResourceIdentifier = AwsMetricAttributeGenerator.escapeDelimiters(
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_BUCKET_NAME]
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_S3_BUCKET]
);
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_QUEUE_NAME)) {
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME)) {
remoteResourceType = NORMALIZED_SQS_SERVICE_NAME + '::Queue';
remoteResourceIdentifier = AwsMetricAttributeGenerator.escapeDelimiters(
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_QUEUE_NAME]
span.attributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]
);
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_QUEUE_URL)) {
} else if (AwsSpanProcessingUtil.isKeyPresent(span, AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL)) {
remoteResourceType = NORMALIZED_SQS_SERVICE_NAME + '::Queue';
remoteResourceIdentifier = SqsUrlParser.getQueueName(
AwsMetricAttributeGenerator.escapeDelimiters(span.attributes[AWS_ATTRIBUTE_KEYS.AWS_QUEUE_URL])
AwsMetricAttributeGenerator.escapeDelimiters(span.attributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL])
);
}
} else if (AwsSpanProcessingUtil.isDBSpan(span)) {
Expand Down Expand Up @@ -479,8 +482,8 @@ export class AwsMetricAttributeGenerator implements MetricAttributeGenerator {
return AwsMetricAttributeGenerator.escapeDelimiters(address) + (port !== '' ? '|' + port : '');
}

private static escapeDelimiters(input: string | AttributeValue | undefined): string | undefined {
if (input === undefined) {
private static escapeDelimiters(input: string | AttributeValue | undefined | null): string | undefined {
if (typeof input !== 'string') {
return undefined;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

import { TextMapPropagator, diag } from '@opentelemetry/api';
import { getPropagator } from '@opentelemetry/auto-configuration-propagators';
import {
getNodeAutoInstrumentations,
getResourceDetectors as getResourceDetectorsFromEnv,
} from '@opentelemetry/auto-instrumentations-node';
import { getResourceDetectors as getResourceDetectorsFromEnv } from '@opentelemetry/auto-instrumentations-node';
import { ENVIRONMENT, TracesSamplerValues, getEnv, getEnvWithoutDefaults } from '@opentelemetry/core';
import { OTLPMetricExporter as OTLPGrpcOTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc';
import {
Expand Down Expand Up @@ -85,7 +82,18 @@ export class AwsOpentelemetryConfigurator {
private spanProcessors: SpanProcessor[];
private propagator: TextMapPropagator;

constructor() {
/**
* The constructor will setup the AwsOpentelemetryConfigurator object to be able to provide a
* configuration for ADOT JavaScript Auto-Instrumentation.
*
* The `instrumentations` are the desired Node auto-instrumentations to be used when using ADOT JavaScript.
* The auto-Instrumentions are usually populated from OTel's `getNodeAutoInstrumentations()` method from the
* `@opentelemetry/auto-instrumentations-node` NPM package, and may have instrumentation patching applied.
*
* @constructor
* @param {Instrumentation[]} instrumentations - Auto-Instrumentations to be added to the ADOT Config
*/
public constructor(instrumentations: Instrumentation[]) {
/*
* Set and Detect Resources via Resource Detectors
*
Expand Down Expand Up @@ -135,7 +143,7 @@ export class AwsOpentelemetryConfigurator {
autoResource = autoResource.merge(detectResourcesSync(internalConfig));
this.resource = autoResource;

this.instrumentations = getNodeAutoInstrumentations();
this.instrumentations = instrumentations;
this.propagator = getPropagator();

// TODO: Consider removing AWSXRayIdGenerator as it is not needed
Expand Down
thpierce marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, SpanKind } from '@opentelemetry/api';
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk';
import { AWS_ATTRIBUTE_KEYS } from '../../../aws-attribute-keys';
import { RequestMetadata, ServiceExtension } from '../../../third-party/otel/aws/services/ServiceExtension';

/*
This file's contents are being contributed to upstream
- https://github.com/open-telemetry/opentelemetry-js-contrib/pull/2361

This class is a service extension to be used for the AWS JavaScript SDK instrumentation patch for Kinesis.
The instrumentation patch adds this extension to the upstream's Map of known extension for Kinesis.
Extensions allow for custom logic for adding service-specific information to spans, such as attributes.
Specifically, we are adding logic to add the `aws.kinesis.stream.name` attribute, to be used to generate
RemoteTarget and achieve parity with the Java/Python instrumentation.
*/
export class KinesisServiceExtension implements ServiceExtension {
thpierce marked this conversation as resolved.
Show resolved Hide resolved
requestPreSpanHook(request: NormalizedRequest, _config: AwsSdkInstrumentationConfig): RequestMetadata {
const streamName = request.commandInput?.StreamName;

const spanKind: SpanKind = SpanKind.CLIENT;
let spanName: string | undefined;

const spanAttributes: Attributes = {};

if (streamName) {
spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_KINESIS_STREAM_NAME] = streamName;
}

const isIncoming = false;
thpierce marked this conversation as resolved.
Show resolved Hide resolved

return {
isIncoming,
spanAttributes,
spanKind,
spanName,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, SpanKind } from '@opentelemetry/api';
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk';
import { AWS_ATTRIBUTE_KEYS } from '../../../aws-attribute-keys';
import { RequestMetadata, ServiceExtension } from '../../../third-party/otel/aws/services/ServiceExtension';

/*
This file's contents are being contributed to upstream
- https://github.com/open-telemetry/opentelemetry-js-contrib/pull/2361

This class is a service extension to be used for the AWS JavaScript SDK instrumentation patch for S3.
The instrumentation patch adds this extension to the upstream's Map of known extension for S3.
Extensions allow for custom logic for adding service-specific information to spans, such as attributes.
Specifically, we are adding logic to add the `aws.s3.bucket` attribute, to be used to generate
RemoteTarget and achieve parity with the Java/Python instrumentation.
*/
export class S3ServiceExtension implements ServiceExtension {
requestPreSpanHook(request: NormalizedRequest, _config: AwsSdkInstrumentationConfig): RequestMetadata {
const bucketName = request.commandInput?.Bucket;

const spanKind: SpanKind = SpanKind.CLIENT;
let spanName: string | undefined;

const spanAttributes: Attributes = {};

if (bucketName) {
spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_S3_BUCKET] = bucketName;
}

const isIncoming = false;

return {
isIncoming,
spanAttributes,
spanKind,
spanName,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AttributeValue } from '@opentelemetry/api';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk';
import { SEMATTRS_MESSAGING_URL } from '@opentelemetry/semantic-conventions';
import { AWS_ATTRIBUTE_KEYS } from '../aws-attribute-keys';
import { SqsUrlParser } from '../sqs-url-parser';
import { RequestMetadata } from '../third-party/otel/aws/services/ServiceExtension';
import { KinesisServiceExtension } from './aws/services/kinesis';
import { S3ServiceExtension } from './aws/services/s3';

export function applyInstrumentationPatches(instrumentations: Instrumentation[]): void {
/*
Apply patches to upstream instrumentation libraries.

This method is invoked to apply changes to upstream instrumentation libraries, typically when changes to upstream
are required on a timeline that cannot wait for upstream release. Generally speaking, patches should be short-term
local solutions that are comparable to long-term upstream solutions.

Where possible, automated testing should be run to catch upstream changes resulting in broken patches
*/
instrumentations.forEach(instrumentation => {
if (instrumentation.instrumentationName === '@opentelemetry/instrumentation-aws-sdk') {
// Access private property servicesExtensions of AwsInstrumentation
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const services: Map<string, ServiceExtension> | undefined = (instrumentation as any).servicesExtensions?.services;
if (services) {
services.set('S3', new S3ServiceExtension());
services.set('Kinesis', new KinesisServiceExtension());
const sqsServiceExtension: any = services.get('SQS');
// It is not expected that `sqsServiceExtension` is undefined
if (sqsServiceExtension) {
const requestPreSpanHook = sqsServiceExtension.requestPreSpanHook;
// Save original `requestPreSpanHook` under a similar name, to be invoked by the patched hook
sqsServiceExtension._requestPreSpanHook = requestPreSpanHook;
// The patched hook will populate the 'aws.sqs.queue.url' and 'aws.sqs.queue.name' attributes according to spec
// from the 'messaging.url' attribute
const patchedRequestPreSpanHook = (
request: NormalizedRequest,
_config: AwsSdkInstrumentationConfig
): RequestMetadata => {
const requestMetadata: RequestMetadata = sqsServiceExtension._requestPreSpanHook(request, _config);
// It is not expected that `requestMetadata.spanAttributes` can possibly be undefined, but still be careful anyways
if (requestMetadata.spanAttributes) {
const queueUrl: AttributeValue | undefined = requestMetadata.spanAttributes[SEMATTRS_MESSAGING_URL];
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL] = queueUrl;
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME] =
SqsUrlParser.getQueueName(queueUrl);
thpierce marked this conversation as resolved.
Show resolved Hide resolved
}
return requestMetadata;
};
sqsServiceExtension.requestPreSpanHook = patchedRequestPreSpanHook;
}
thpierce marked this conversation as resolved.
Show resolved Hide resolved
}
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
// Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.

import { DiagConsoleLogger, diag } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Instrumentation } from '@opentelemetry/instrumentation';
import * as opentelemetry from '@opentelemetry/sdk-node';
import { AwsOpentelemetryConfigurator } from './aws-opentelemetry-configurator';
import { applyInstrumentationPatches } from './patches/instrumentation-patch';

diag.setLogger(new DiagConsoleLogger(), opentelemetry.core.getEnv().OTEL_LOG_LEVEL);

Expand Down Expand Up @@ -40,7 +43,12 @@ export function setAwsDefaultEnvironmentVariables(): void {
}
setAwsDefaultEnvironmentVariables();

const configurator: AwsOpentelemetryConfigurator = new AwsOpentelemetryConfigurator();
const instrumentations: Instrumentation[] = getNodeAutoInstrumentations();

// Apply instrumentation patches
applyInstrumentationPatches(instrumentations);

const configurator: AwsOpentelemetryConfigurator = new AwsOpentelemetryConfigurator(instrumentations);
const configuration: Partial<opentelemetry.NodeSDKConfiguration> = configurator.configure();

const sdk: opentelemetry.NodeSDK = new opentelemetry.NodeSDK(configuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AttributeValue } from '@opentelemetry/api';

const HTTP_SCHEMA: string = 'http://';
const HTTPS_SCHEMA: string = 'https://';

Expand All @@ -16,8 +18,8 @@ export class SqsUrlParser {
* /'s (excluding schema), the second part should be a 12-digit account id, and the third part
* should be a valid queue name, per SQS naming conventions.
*/
public static getQueueName(url: string | undefined): string | undefined {
if (url === undefined) {
public static getQueueName(url: AttributeValue | undefined): string | undefined {
if (typeof url !== 'string') {
return undefined;
}
url = url.replace(HTTP_SCHEMA, '').replace(HTTPS_SCHEMA, '');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { DiagLogger, Span, SpanAttributes, SpanKind, Tracer } from '@opentelemetry/api';
import {
AwsSdkInstrumentationConfig,
NormalizedRequest,
NormalizedResponse,
} from '@opentelemetry/instrumentation-aws-sdk';

export interface RequestMetadata {
// isIncoming - if true, then the operation callback / promise should be bind with the operation's span
isIncoming: boolean;
spanAttributes?: SpanAttributes;
spanKind?: SpanKind;
spanName?: string;
}

export interface ServiceExtension {
// called before request is sent, and before span is started
requestPreSpanHook: (
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
) => RequestMetadata;

// called before request is sent, and after span is started
requestPostSpanHook?: (request: NormalizedRequest) => void;

responseHook?: (
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
) => void;
}
Loading
Loading