diff --git a/CHANGELOG.md b/CHANGELOG.md index fd4c06d3fd..f6fba77e7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - Added `topologySpreadConstraints` support in Helm chart. - Added `priorityClassName` support in Helm chart. - Added support for including principal name in subscoped credentials. `INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL` (default: false) can be used to toggle this feature. If enabled, cached credentials issued to one principal will no longer be available for others. +- Added support for including OpenTelemetry trace IDs in AWS STS session tags. This requires session tags to be enabled via `INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL` and can be toggled with `INCLUDE_TRACE_ID_IN_SESSION_TAGS` (default: false). Note: enabling trace IDs disables credential caching (each request has a unique trace ID), which may increase STS calls and latency. - Added support for [Kubernetes Gateway API](https://gateway-api.sigs.k8s.io/) to the Helm Chart. ### Changes diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 5c6858ceb1..1eb121c498 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -104,6 +104,20 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(false) .buildFeatureConfiguration(); + public static final FeatureConfiguration INCLUDE_TRACE_ID_IN_SESSION_TAGS = + PolarisConfiguration.builder() + .key("INCLUDE_TRACE_ID_IN_SESSION_TAGS") + .description( + "If set to true (and INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL is also true), the OpenTelemetry\n" + + "trace ID will be included as a session tag in AWS STS AssumeRole requests. This enables\n" + + "end-to-end correlation between catalog operations (Polaris events), credential vending (CloudTrail),\n" + + "and metrics reports from compute engines.\n" + + "WARNING: Enabling this feature completely disables credential caching because every request\n" + + "has a unique trace ID. This may significantly increase latency and STS API costs.\n" + + "Consider leaving this disabled unless end-to-end tracing correlation is critical.") + .defaultValue(false) + .buildFeatureConfiguration(); + public static final FeatureConfiguration ALLOW_SETTING_S3_ENDPOINTS = PolarisConfiguration.builder() .key("ALLOW_SETTING_S3_ENDPOINTS") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java index 718062c414..40c2627266 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java @@ -33,10 +33,11 @@ *
  • {@code namespace} - The namespace/database being accessed (e.g., "db.schema") *
  • {@code tableName} - The name of the table being accessed *
  • {@code activatedRoles} - Comma-separated list of activated principal roles + *
  • {@code traceId} - OpenTelemetry trace ID for end-to-end correlation * * - *

    These values appear in cloud provider audit logs (e.g., AWS CloudTrail), enabling correlation - * between catalog operations and data access events. + *

    These values appear in cloud provider audit logs (e.g., AWS CloudTrail), enabling + * deterministic correlation between catalog operations and data access events. */ @PolarisImmutable public interface CredentialVendingContext { @@ -48,6 +49,7 @@ public interface CredentialVendingContext { String TAG_KEY_TABLE = "polaris:table"; String TAG_KEY_PRINCIPAL = "polaris:principal"; String TAG_KEY_ROLES = "polaris:roles"; + String TAG_KEY_TRACE_ID = "polaris:trace_id"; /** The name of the catalog that is vending credentials. */ Optional catalogName(); @@ -67,6 +69,20 @@ public interface CredentialVendingContext { */ Optional activatedRoles(); + /** + * The OpenTelemetry trace ID for end-to-end correlation. This enables correlation between + * credential vending (CloudTrail), catalog operations (Polaris events), and metrics reports from + * compute engines. + * + *

    This field is only populated when the {@code INCLUDE_TRACE_ID_IN_SESSION_TAGS} feature flag + * is enabled. When populated, the trace ID is included in AWS STS session tags and becomes part + * of the credential cache key (since it affects the vended credentials). + * + *

    When the flag is disabled (default), this field is left empty ({@code Optional.empty()}), + * which allows efficient credential caching across requests with different trace IDs. + */ + Optional traceId(); + /** * Creates a new builder for CredentialVendingContext. * @@ -95,6 +111,8 @@ interface Builder { Builder activatedRoles(Optional activatedRoles); + Builder traceId(Optional traceId); + CredentialVendingContext build(); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index d595682529..8fce267afb 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -125,7 +125,9 @@ public StorageAccessConfig getSubscopedCreds( .toJson()) .durationSeconds(storageCredentialDurationSeconds); - // Add session tags when the feature is enabled + // Add session tags when the feature is enabled. + // Note: The trace ID is controlled at the source (StorageAccessConfigProvider). + // If INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled, the context will contain the trace ID. if (includeSessionTags) { List sessionTags = buildSessionTags(polarisPrincipal.getName(), credentialVendingContext); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java index 7403ca9f12..7931c7204f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java @@ -43,8 +43,13 @@ private AwsSessionTagsBuilder() { * Builds a list of AWS STS session tags from the principal name and credential vending context. * These tags will appear in CloudTrail events for correlation purposes. * + *

    The trace ID tag is only included if {@link CredentialVendingContext#traceId()} is present. + * This is controlled at the source (StorageAccessConfigProvider) based on the + * INCLUDE_TRACE_ID_IN_SESSION_TAGS feature flag. + * * @param principalName the name of the principal requesting credentials - * @param context the credential vending context containing catalog, namespace, table, and roles + * @param context the credential vending context containing catalog, namespace, table, roles, and + * optionally trace ID * @return a list of STS Tags to attach to the AssumeRole request */ public static List buildSessionTags(String principalName, CredentialVendingContext context) { @@ -78,6 +83,19 @@ public static List buildSessionTags(String principalName, CredentialVending .value(truncateTagValue(context.tableName().orElse(TAG_VALUE_UNKNOWN))) .build()); + // Only include trace ID if it's present in the context. + // The context's traceId is only populated when INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled. + // This allows efficient credential caching when trace IDs are not needed in session tags. + context + .traceId() + .ifPresent( + traceId -> + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_TRACE_ID) + .value(truncateTagValue(traceId)) + .build())); + return tags; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index a2ce3b2b03..125ab8ea3b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -132,10 +132,14 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( // When session tags are enabled, the cache key needs to include: // 1. The credential vending context to avoid returning cached credentials with different - // session tags (catalog/namespace/table/roles) + // session tags (catalog/namespace/table/roles/traceId) // 2. The principal, because the polaris:principal session tag is included in AWS credentials // and we must not serve credentials tagged for principal A to principal B // When session tags are disabled, we only include principal if explicitly configured. + // + // Note: The trace ID is controlled at the source (StorageAccessConfigProvider). When + // INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled, the context's traceId is left empty, + // which allows efficient caching across requests with different trace IDs. boolean includePrincipalInCacheKey = includePrincipalNameInSubscopedCredential || includeSessionTags; // When session tags are disabled, use empty context to ensure consistent cache key behavior diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index b062d1a78d..1e46c2357d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -58,12 +58,19 @@ public interface StorageCredentialCacheKey { /** * The credential vending context for session tags. When session tags are enabled, this contains - * the catalog, namespace, table, and roles information. When session tags are disabled, this - * should be {@link CredentialVendingContext#empty()} to ensure consistent cache key behavior. + * the catalog, namespace, table, roles, and optionally trace ID information. When session tags + * are disabled, this should be {@link CredentialVendingContext#empty()} to ensure consistent + * cache key behavior. + * + *

    The trace ID in the context is only populated when the {@code + * INCLUDE_TRACE_ID_IN_SESSION_TAGS} feature flag is enabled. When populated, it becomes part of + * the cache key comparison (since it affects the vended credentials via session tags). When + * empty, credentials can be cached efficiently across requests with different trace IDs. */ @Value.Parameter(order = 9) CredentialVendingContext credentialVendingContext(); + /** Creates a cache key from the provided parameters. */ static StorageCredentialCacheKey of( String realmId, PolarisEntity entity, diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index 254aea4d32..082eed3ec8 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -1014,7 +1014,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { ArgumentCaptor.forClass(AssumeRoleRequest.class); Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); - // Roles are included in context (not extracted from principal) to be part of cache key + // Roles are included in context (not extracted from principal) to be part of cache key. + // Note: traceId is NOT set because INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled (default). + // In production, StorageAccessConfigProvider only populates traceId when that flag is enabled. CredentialVendingContext context = CredentialVendingContext.builder() .catalogName(Optional.of("test-catalog")) @@ -1040,7 +1042,8 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { context); AssumeRoleRequest capturedRequest = requestCaptor.getValue(); - Assertions.assertThat(capturedRequest.tags()).isNotEmpty(); + // 5 tags are included when session tags enabled but trace_id not in context + Assertions.assertThat(capturedRequest.tags()).hasSize(5); Assertions.assertThat(capturedRequest.tags()) .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog")); Assertions.assertThat(capturedRequest.tags()) @@ -1053,8 +1056,11 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { // Roles are sorted alphabetically and joined with comma Assertions.assertThat(capturedRequest.tags()) .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("admin,reader")); + // Verify trace_id is NOT included when not present in context + Assertions.assertThat(capturedRequest.tags()) + .noneMatch(tag -> tag.key().equals("polaris:trace_id")); - // Verify transitive tag keys are set + // Verify transitive tag keys are set (without trace_id) Assertions.assertThat(capturedRequest.transitiveTagKeys()) .containsExactlyInAnyOrder( "polaris:catalog", @@ -1064,6 +1070,96 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { "polaris:roles"); } + @Test + public void testSessionTagsWithTraceIdWhenBothFlagsEnabled() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + // Create a realm config with both session tags AND trace_id enabled + RealmConfig sessionTagsAndTraceIdEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + if (configName.equals( + FeatureConfiguration.INCLUDE_TRACE_ID_IN_SESSION_TAGS.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + // When INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled, StorageAccessConfigProvider populates + // traceId in the context. The presence of traceId in the context determines whether it's + // included in session tags (and in the cache key, since it's a normal field). + CredentialVendingContext context = + CredentialVendingContext.builder() + .catalogName(Optional.of("test-catalog")) + .namespace(Optional.of("db.schema")) + .tableName(Optional.of("my_table")) + .activatedRoles(Optional.of("admin,reader")) + .traceId(Optional.of("abc123def456")) + .build(); + + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsAndTraceIdEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + // All 6 tags are included when both session tags AND trace_id are enabled + Assertions.assertThat(capturedRequest.tags()).hasSize(6); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:namespace") && tag.value().equals("db.schema")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("my_table")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> tag.key().equals("polaris:principal") && tag.value().equals("test-principal")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("admin,reader")); + // Verify trace_id IS included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is true + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> tag.key().equals("polaris:trace_id") && tag.value().equals("abc123def456")); + + // Verify transitive tag keys include trace_id + Assertions.assertThat(capturedRequest.transitiveTagKeys()) + .containsExactlyInAnyOrder( + "polaris:catalog", + "polaris:namespace", + "polaris:table", + "polaris:principal", + "polaris:roles", + "polaris:trace_id"); + } + @Test public void testSessionTagsNotIncludedWhenFeatureDisabled() { StsClient stsClient = Mockito.mock(StsClient.class); @@ -1154,7 +1250,7 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { context); AssumeRoleRequest capturedRequest = requestCaptor.getValue(); - // All 5 tags are always included; missing values use "unknown" placeholder + // 5 tags are included when session tags enabled but trace_id disabled (default) Assertions.assertThat(capturedRequest.tags()).hasSize(5); Assertions.assertThat(capturedRequest.tags()) .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog")); @@ -1168,6 +1264,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown")); Assertions.assertThat(capturedRequest.tags()) .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown")); + // trace_id is NOT included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is not set (defaults to false) + Assertions.assertThat(capturedRequest.tags()) + .noneMatch(tag -> tag.key().equals("polaris:trace_id")); } @Test @@ -1276,7 +1375,7 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { CredentialVendingContext.empty()); AssumeRoleRequest capturedRequest = requestCaptor.getValue(); - // All 5 tags are always included; missing values use "unknown" placeholder + // 5 tags are included when session tags enabled but trace_id disabled (default) Assertions.assertThat(capturedRequest.tags()).hasSize(5); Assertions.assertThat(capturedRequest.tags()) .anyMatch( @@ -1290,6 +1389,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) { .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown")); Assertions.assertThat(capturedRequest.tags()) .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown")); + // trace_id is NOT included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is not set (defaults to false) + Assertions.assertThat(capturedRequest.tags()) + .noneMatch(tag -> tag.key().equals("polaris:trace_id")); } /** diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java index c92204fe73..76f5a53f92 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java @@ -19,6 +19,8 @@ package org.apache.polaris.service.catalog.io; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import jakarta.annotation.Nonnull; import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; @@ -187,6 +189,32 @@ private CredentialVendingContext buildCredentialVendingContext( builder.activatedRoles(Optional.of(rolesString)); } + // Only include trace ID when the feature flag is enabled. + // When enabled, trace IDs are included in AWS STS session tags and become part of the + // credential cache key (since they affect the vended credentials). + // When disabled (default), trace IDs are not included, allowing efficient credential + // caching across requests with different trace IDs. + boolean includeTraceIdInSessionTags = + storageCredentialsVendor + .getRealmConfig() + .getConfig(FeatureConfiguration.INCLUDE_TRACE_ID_IN_SESSION_TAGS); + if (includeTraceIdInSessionTags) { + builder.traceId(getCurrentTraceId()); + } + return builder.build(); } + + /** + * Extracts the current OpenTelemetry trace ID from the active span context. + * + * @return the trace ID if a valid span context exists, empty otherwise + */ + private Optional getCurrentTraceId() { + SpanContext spanContext = Span.current().getSpanContext(); + if (spanContext.isValid()) { + return Optional.of(spanContext.getTraceId()); + } + return Optional.empty(); + } }