Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- Added `topologySpreadConstraints` support in Helm chart.
- Added `priorityClassName` support in Helm chart.
- Added support for including principal name in subscoped credentials. `INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL` (default: false) can be used to toggle this feature. If enabled, cached credentials issued to one principal will no longer be available for others.
- Added support for including OpenTelemetry trace IDs in AWS STS session tags. This requires session tags to be enabled via `INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL` and can be toggled with `INCLUDE_TRACE_ID_IN_SESSION_TAGS` (default: false). Note: enabling trace IDs disables credential caching (each request has a unique trace ID), which may increase STS calls and latency.
- Added support for [Kubernetes Gateway API](https://gateway-api.sigs.k8s.io/) to the Helm Chart.

### Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(false)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean> INCLUDE_TRACE_ID_IN_SESSION_TAGS =
PolarisConfiguration.<Boolean>builder()
.key("INCLUDE_TRACE_ID_IN_SESSION_TAGS")
Copy link
Contributor

@dimas-b dimas-b Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we're adding a feature flag, I'd prefer to mention it in CHANGELOG.md right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it shortly. Can you please see @adutra 's comment about request_id? If you remember, in the previous PR about session tags, we had explicitly removed request_id because we couldn't find a reliable way to get the request_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my personal POV, I do not know of real use cases for "request IDs", but I suppose some people use it.

I'd be ok leaving it out for now and adding later if concrete use cases for propagating it to STS arise.

@adutra : WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are able to compute things like the principal or the otel context, it's because there is an active request context when the tags are computed. So, I'm a bit surprised that it's not possible to get the request ID as well. Have you tried this approach:

private Optional<String> getRequestId() {
// See org.jboss.resteasy.reactive.server.injection.ContextProducers
ResteasyReactiveRequestContext context = CurrentRequestManager.get();
if (context != null) {
ContainerRequestContextImpl request = context.getContainerRequestContext();
String requestId = (String) request.getProperty(RequestIdFilter.REQUEST_ID_KEY);
return Optional.ofNullable(requestId);
}
return Optional.empty();
}

I'm not saying we absolutely must include the request id here, but I do remember that for events, not including the request ID was a friction point for some contributors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adutra. I am on it.

Copy link
Contributor Author

@obelix74 obelix74 Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adutra and @dimas-b does the aforementioned code snippet handle non-HTTP clients or programmatic clients as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed a commit adding support for request_id, but my understanding is that the code that fetches the request_id will only work for HTTP requests - this was the primary reason we removed request_id from the previous PR that added request_id.

I request @dimas-b @adutra and @singhpk234 to review this and let me know if we should keep this as part of this PR or remove it.

The request_id code follows the same design suggested by @dimas-b for the trace_id. There is one change. Since the request_id is controlled by the end-user and may contain non-STS friendly characters, I added code to sanitize it - if this doesn't happen, vended credentials may fail to be issued.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not review the latest changes yet, posting proactively: STS may be involved in non-HTTP requests, so if request ID is part of STS tags we need to find a way to access / generate it for async tasks too (cf. TaskManagerImpl)... I'll review actual code later ⏳

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimas-b thank you. Currently the code handles the absence of request-id by omitting it (request-id is optional, just like trace-id is). I will wait for your review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented on related CDI aspects separately.

.description(
"If set to true (and INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL is also true), the OpenTelemetry\n"
+ "trace ID will be included as a session tag in AWS STS AssumeRole requests. This enables\n"
+ "end-to-end correlation between catalog operations (Polaris events), credential vending (CloudTrail),\n"
+ "and metrics reports from compute engines.\n"
+ "WARNING: Enabling this feature completely disables credential caching because every request\n"
+ "has a unique trace ID. This may significantly increase latency and STS API costs.\n"
+ "Consider leaving this disabled unless end-to-end tracing correlation is critical.")
.defaultValue(false)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean> ALLOW_SETTING_S3_ENDPOINTS =
PolarisConfiguration.<Boolean>builder()
.key("ALLOW_SETTING_S3_ENDPOINTS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
* <li>{@code namespace} - The namespace/database being accessed (e.g., "db.schema")
* <li>{@code tableName} - The name of the table being accessed
* <li>{@code activatedRoles} - Comma-separated list of activated principal roles
* <li>{@code traceId} - OpenTelemetry trace ID for end-to-end correlation
* </ul>
*
* <p>These values appear in cloud provider audit logs (e.g., AWS CloudTrail), enabling correlation
* between catalog operations and data access events.
* <p>These values appear in cloud provider audit logs (e.g., AWS CloudTrail), enabling
* deterministic correlation between catalog operations and data access events.
*/
@PolarisImmutable
public interface CredentialVendingContext {
Expand All @@ -48,6 +49,7 @@ public interface CredentialVendingContext {
String TAG_KEY_TABLE = "polaris:table";
String TAG_KEY_PRINCIPAL = "polaris:principal";
String TAG_KEY_ROLES = "polaris:roles";
String TAG_KEY_TRACE_ID = "polaris:trace_id";

/** The name of the catalog that is vending credentials. */
Optional<String> catalogName();
Expand All @@ -67,6 +69,20 @@ public interface CredentialVendingContext {
*/
Optional<String> activatedRoles();

/**
* The OpenTelemetry trace ID for end-to-end correlation. This enables correlation between
* credential vending (CloudTrail), catalog operations (Polaris events), and metrics reports from
* compute engines.
*
* <p>This field is only populated when the {@code INCLUDE_TRACE_ID_IN_SESSION_TAGS} feature flag
* is enabled. When populated, the trace ID is included in AWS STS session tags and becomes part
* of the credential cache key (since it affects the vended credentials).
*
* <p>When the flag is disabled (default), this field is left empty ({@code Optional.empty()}),
* which allows efficient credential caching across requests with different trace IDs.
*/
Optional<String> traceId();

/**
* Creates a new builder for CredentialVendingContext.
*
Expand Down Expand Up @@ -95,6 +111,8 @@ interface Builder {

Builder activatedRoles(Optional<String> activatedRoles);

Builder traceId(Optional<String> traceId);

CredentialVendingContext build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public StorageAccessConfig getSubscopedCreds(
.toJson())
.durationSeconds(storageCredentialDurationSeconds);

// Add session tags when the feature is enabled
// Add session tags when the feature is enabled.
// Note: The trace ID is controlled at the source (StorageAccessConfigProvider).
// If INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled, the context will contain the trace ID.
if (includeSessionTags) {
List<Tag> sessionTags =
buildSessionTags(polarisPrincipal.getName(), credentialVendingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ private AwsSessionTagsBuilder() {
* Builds a list of AWS STS session tags from the principal name and credential vending context.
* These tags will appear in CloudTrail events for correlation purposes.
*
* <p>The trace ID tag is only included if {@link CredentialVendingContext#traceId()} is present.
* This is controlled at the source (StorageAccessConfigProvider) based on the
* INCLUDE_TRACE_ID_IN_SESSION_TAGS feature flag.
*
* @param principalName the name of the principal requesting credentials
* @param context the credential vending context containing catalog, namespace, table, and roles
* @param context the credential vending context containing catalog, namespace, table, roles, and
* optionally trace ID
* @return a list of STS Tags to attach to the AssumeRole request
*/
public static List<Tag> buildSessionTags(String principalName, CredentialVendingContext context) {
Expand Down Expand Up @@ -78,6 +83,19 @@ public static List<Tag> buildSessionTags(String principalName, CredentialVending
.value(truncateTagValue(context.tableName().orElse(TAG_VALUE_UNKNOWN)))
.build());

// Only include trace ID if it's present in the context.
// The context's traceId is only populated when INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled.
// This allows efficient credential caching when trace IDs are not needed in session tags.
context
.traceId()
.ifPresent(
traceId ->
tags.add(
Tag.builder()
.key(CredentialVendingContext.TAG_KEY_TRACE_ID)
.value(truncateTagValue(traceId))
.build()));

return tags;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ public StorageAccessConfig getOrGenerateSubScopeCreds(

// When session tags are enabled, the cache key needs to include:
// 1. The credential vending context to avoid returning cached credentials with different
// session tags (catalog/namespace/table/roles)
// session tags (catalog/namespace/table/roles/traceId)
// 2. The principal, because the polaris:principal session tag is included in AWS credentials
// and we must not serve credentials tagged for principal A to principal B
// When session tags are disabled, we only include principal if explicitly configured.
//
// Note: The trace ID is controlled at the source (StorageAccessConfigProvider). When
// INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled, the context's traceId is left empty,
// which allows efficient caching across requests with different trace IDs.
boolean includePrincipalInCacheKey =
includePrincipalNameInSubscopedCredential || includeSessionTags;
// When session tags are disabled, use empty context to ensure consistent cache key behavior
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,19 @@ public interface StorageCredentialCacheKey {

/**
* The credential vending context for session tags. When session tags are enabled, this contains
* the catalog, namespace, table, and roles information. When session tags are disabled, this
* should be {@link CredentialVendingContext#empty()} to ensure consistent cache key behavior.
* the catalog, namespace, table, roles, and optionally trace ID information. When session tags
* are disabled, this should be {@link CredentialVendingContext#empty()} to ensure consistent
* cache key behavior.
*
* <p>The trace ID in the context is only populated when the {@code
* INCLUDE_TRACE_ID_IN_SESSION_TAGS} feature flag is enabled. When populated, it becomes part of
* the cache key comparison (since it affects the vended credentials via session tags). When
* empty, credentials can be cached efficiently across requests with different trace IDs.
*/
@Value.Parameter(order = 9)
CredentialVendingContext credentialVendingContext();

/** Creates a cache key from the provided parameters. */
static StorageCredentialCacheKey of(
String realmId,
PolarisEntity entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
ArgumentCaptor.forClass(AssumeRoleRequest.class);
Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE);

// Roles are included in context (not extracted from principal) to be part of cache key
// Roles are included in context (not extracted from principal) to be part of cache key.
// Note: traceId is NOT set because INCLUDE_TRACE_ID_IN_SESSION_TAGS is disabled (default).
// In production, StorageAccessConfigProvider only populates traceId when that flag is enabled.
CredentialVendingContext context =
CredentialVendingContext.builder()
.catalogName(Optional.of("test-catalog"))
Expand All @@ -1040,7 +1042,8 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
context);

AssumeRoleRequest capturedRequest = requestCaptor.getValue();
Assertions.assertThat(capturedRequest.tags()).isNotEmpty();
// 5 tags are included when session tags enabled but trace_id not in context
Assertions.assertThat(capturedRequest.tags()).hasSize(5);
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog"));
Assertions.assertThat(capturedRequest.tags())
Expand All @@ -1053,8 +1056,11 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
// Roles are sorted alphabetically and joined with comma
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("admin,reader"));
// Verify trace_id is NOT included when not present in context
Assertions.assertThat(capturedRequest.tags())
.noneMatch(tag -> tag.key().equals("polaris:trace_id"));

// Verify transitive tag keys are set
// Verify transitive tag keys are set (without trace_id)
Assertions.assertThat(capturedRequest.transitiveTagKeys())
.containsExactlyInAnyOrder(
"polaris:catalog",
Expand All @@ -1064,6 +1070,96 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
"polaris:roles");
}

@Test
public void testSessionTagsWithTraceIdWhenBothFlagsEnabled() {
StsClient stsClient = Mockito.mock(StsClient.class);
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
String externalId = "externalId";
String bucket = "bucket";
String warehouseKeyPrefix = "path/to/warehouse";

// Create a realm config with both session tags AND trace_id enabled
RealmConfig sessionTagsAndTraceIdEnabledConfig =
new RealmConfigImpl(
new PolarisConfigurationStore() {
@SuppressWarnings("unchecked")
@Override
public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
if (configName.equals(
FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) {
return "true";
}
if (configName.equals(
FeatureConfiguration.INCLUDE_TRACE_ID_IN_SESSION_TAGS.key())) {
return "true";
}
return null;
}
},
() -> "realm");

ArgumentCaptor<AssumeRoleRequest> requestCaptor =
ArgumentCaptor.forClass(AssumeRoleRequest.class);
Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE);

// When INCLUDE_TRACE_ID_IN_SESSION_TAGS is enabled, StorageAccessConfigProvider populates
// traceId in the context. The presence of traceId in the context determines whether it's
// included in session tags (and in the cache key, since it's a normal field).
CredentialVendingContext context =
CredentialVendingContext.builder()
.catalogName(Optional.of("test-catalog"))
.namespace(Optional.of("db.schema"))
.tableName(Optional.of("my_table"))
.activatedRoles(Optional.of("admin,reader"))
.traceId(Optional.of("abc123def456"))
.build();

new AwsCredentialsStorageIntegration(
AwsStorageConfigurationInfo.builder()
.addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
.roleARN(roleARN)
.externalId(externalId)
.build(),
stsClient)
.getSubscopedCreds(
sessionTagsAndTraceIdEnabledConfig,
true,
Set.of(s3Path(bucket, warehouseKeyPrefix)),
Set.of(s3Path(bucket, warehouseKeyPrefix)),
POLARIS_PRINCIPAL,
Optional.empty(),
context);

AssumeRoleRequest capturedRequest = requestCaptor.getValue();
// All 6 tags are included when both session tags AND trace_id are enabled
Assertions.assertThat(capturedRequest.tags()).hasSize(6);
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:namespace") && tag.value().equals("db.schema"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("my_table"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(
tag -> tag.key().equals("polaris:principal") && tag.value().equals("test-principal"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("admin,reader"));
// Verify trace_id IS included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is true
Assertions.assertThat(capturedRequest.tags())
.anyMatch(
tag -> tag.key().equals("polaris:trace_id") && tag.value().equals("abc123def456"));

// Verify transitive tag keys include trace_id
Assertions.assertThat(capturedRequest.transitiveTagKeys())
.containsExactlyInAnyOrder(
"polaris:catalog",
"polaris:namespace",
"polaris:table",
"polaris:principal",
"polaris:roles",
"polaris:trace_id");
}

@Test
public void testSessionTagsNotIncludedWhenFeatureDisabled() {
StsClient stsClient = Mockito.mock(StsClient.class);
Expand Down Expand Up @@ -1154,7 +1250,7 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
context);

AssumeRoleRequest capturedRequest = requestCaptor.getValue();
// All 5 tags are always included; missing values use "unknown" placeholder
// 5 tags are included when session tags enabled but trace_id disabled (default)
Assertions.assertThat(capturedRequest.tags()).hasSize(5);
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog"));
Expand All @@ -1168,6 +1264,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
.anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown"));
// trace_id is NOT included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is not set (defaults to false)
Assertions.assertThat(capturedRequest.tags())
.noneMatch(tag -> tag.key().equals("polaris:trace_id"));
}

@Test
Expand Down Expand Up @@ -1276,7 +1375,7 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
CredentialVendingContext.empty());

AssumeRoleRequest capturedRequest = requestCaptor.getValue();
// All 5 tags are always included; missing values use "unknown" placeholder
// 5 tags are included when session tags enabled but trace_id disabled (default)
Assertions.assertThat(capturedRequest.tags()).hasSize(5);
Assertions.assertThat(capturedRequest.tags())
.anyMatch(
Expand All @@ -1290,6 +1389,9 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
.anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown"));
Assertions.assertThat(capturedRequest.tags())
.anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown"));
// trace_id is NOT included when INCLUDE_TRACE_ID_IN_SESSION_TAGS is not set (defaults to false)
Assertions.assertThat(capturedRequest.tags())
.noneMatch(tag -> tag.key().equals("polaris:trace_id"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.polaris.service.catalog.io;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import jakarta.annotation.Nonnull;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -187,6 +189,32 @@ private CredentialVendingContext buildCredentialVendingContext(
builder.activatedRoles(Optional.of(rolesString));
}

// Only include trace ID when the feature flag is enabled.
// When enabled, trace IDs are included in AWS STS session tags and become part of the
// credential cache key (since they affect the vended credentials).
// When disabled (default), trace IDs are not included, allowing efficient credential
// caching across requests with different trace IDs.
boolean includeTraceIdInSessionTags =
storageCredentialsVendor
.getRealmConfig()
.getConfig(FeatureConfiguration.INCLUDE_TRACE_ID_IN_SESSION_TAGS);
if (includeTraceIdInSessionTags) {
builder.traceId(getCurrentTraceId());
}

return builder.build();
}

/**
* Extracts the current OpenTelemetry trace ID from the active span context.
*
* @return the trace ID if a valid span context exists, empty otherwise
*/
private Optional<String> getCurrentTraceId() {
SpanContext spanContext = Span.current().getSpanContext();
if (spanContext.isValid()) {
return Optional.of(spanContext.getTraceId());
}
return Optional.empty();
}
}