beanProperties) {
+
+ if (fieldPatterns.isEmpty()) {
+ return beanProperties;
+ }
+
+ return beanProperties.stream()
+ .map(writer -> shouldRedactField(writer.getName()) ? createRedactingWriter(writer) : writer)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Checks if a field name matches any of the configured redaction patterns.
+ *
+ * @param fieldName the field name to check
+ * @return true if the field should be redacted
+ */
+ private boolean shouldRedactField(String fieldName) {
+ return fieldPatterns.stream().anyMatch(pattern -> pattern.matcher(fieldName).matches());
+ }
+
+ /**
+ * Creates a new BeanPropertyWriter that redacts the field value.
+ *
+ * @param original the original BeanPropertyWriter
+ * @return a new BeanPropertyWriter that redacts the value
+ */
+ private BeanPropertyWriter createRedactingWriter(BeanPropertyWriter original) {
+ return new BeanPropertyWriter(original) {
+ @Override
+ public void serializeAsField(Object bean, JsonGenerator gen, SerializerProvider prov)
+ throws Exception {
+ gen.writeFieldName(_name);
+ gen.writeString(RedactingSerializer.getRedactedMarker());
+ }
+ };
+ }
+}
+
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java
new file mode 100644
index 0000000000..3011e4d3f6
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import java.io.IOException;
+
+/**
+ * A Jackson serializer that redacts string values by replacing them with a redaction marker.
+ *
+ * This serializer is used to prevent sensitive information from being included in serialized
+ * JSON output, particularly for event logging to external systems like CloudWatch.
+ */
+public class RedactingSerializer extends JsonSerializer {
+ private static final String REDACTED_MARKER = "***REDACTED***";
+
+ @Override
+ public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers)
+ throws IOException {
+ gen.writeString(REDACTED_MARKER);
+ }
+
+ /**
+ * Returns the marker string used to indicate redacted values.
+ *
+ * @return the redaction marker string
+ */
+ public static String getRedactedMarker() {
+ return REDACTED_MARKER;
+ }
+}
+
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java
new file mode 100644
index 0000000000..7f9dcd1f52
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json.mixins;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.iceberg.catalog.Namespace;
+
+/** Mixins for Iceberg classes we don't control, to keep JSON concise. */
+public final class IcebergMixins {
+
+ // Private constructor to prevent instantiation
+ private IcebergMixins() {}
+
+ /** Serializes Namespace as an object like: "namespace": ["sales", "north.america"] */
+ public abstract static class NamespaceMixin {
+ @JsonProperty("namespace")
+ public abstract String[] levels();
+ }
+
+ /**
+ * Serializes TableIdentifier as a scalar string like: {"namespace": ["sales", "north.america"],
+ * "name": "transactions"}
+ */
+ public abstract static class TableIdentifierMixin {
+ @JsonProperty("namespace")
+ public abstract Namespace namespace();
+
+ @JsonProperty("name")
+ public abstract String name();
+ }
+}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java
new file mode 100644
index 0000000000..7ea183e3c1
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json.mixins;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
+public abstract class PolarisEventBaseMixin {}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java
new file mode 100644
index 0000000000..4e4c2e8a3a
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json.mixins;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.Map;
+import org.apache.polaris.service.events.json.RedactingSerializer;
+
+/**
+ * Jackson mixins for redacting sensitive information from event payloads.
+ *
+ * These mixins are applied to various Iceberg and Polaris types to prevent sensitive data from
+ * being included in serialized events sent to external systems like CloudWatch.
+ *
+ *
The mixins support different redaction modes:
+ *
+ *
+ * PARTIAL - Redact credentials and secrets, but keep metadata like locations
+ * FULL - Redact all potentially sensitive fields including locations and properties
+ *
+ */
+public class RedactionMixins {
+
+ /**
+ * Mixin for partial redaction of TableMetadata.
+ *
+ * In PARTIAL mode, we keep the location but redact properties that may contain secrets.
+ */
+ public abstract static class TableMetadataPartialRedactionMixin {
+ @JsonIgnore
+ public abstract Map properties();
+ }
+
+ /**
+ * Mixin for full redaction of TableMetadata.
+ *
+ * In FULL mode, we redact both location and properties.
+ */
+ public abstract static class TableMetadataFullRedactionMixin {
+ @JsonSerialize(using = RedactingSerializer.class)
+ public abstract String location();
+
+ @JsonIgnore
+ public abstract Map properties();
+ }
+
+ /**
+ * Mixin for partial redaction of ViewMetadata.
+ *
+ * In PARTIAL mode, we keep the location but redact properties that may contain secrets.
+ */
+ public abstract static class ViewMetadataPartialRedactionMixin {
+ @JsonIgnore
+ public abstract Map properties();
+ }
+
+ /**
+ * Mixin for full redaction of ViewMetadata.
+ *
+ * In FULL mode, we redact both location and properties.
+ */
+ public abstract static class ViewMetadataFullRedactionMixin {
+ @JsonSerialize(using = RedactingSerializer.class)
+ public abstract String location();
+
+ @JsonIgnore
+ public abstract Map properties();
+ }
+
+ /**
+ * Mixin for redacting LoadTableResponse/LoadTableResult.
+ *
+ * Redacts the config map and storage-credentials array which contain sensitive credential
+ * information.
+ */
+ public abstract static class LoadTableResponseRedactionMixin {
+ @JsonIgnore
+ public abstract Map config();
+
+ @JsonIgnore
+ public abstract Object storageCredentials();
+ }
+
+ /**
+ * Mixin for redacting LoadViewResponse/LoadViewResult.
+ *
+ * Redacts the config map which may contain sensitive configuration.
+ */
+ public abstract static class LoadViewResponseRedactionMixin {
+ @JsonIgnore
+ public abstract Map config();
+ }
+
+ /**
+ * Mixin for redacting ConfigResponse.
+ *
+ * Redacts both defaults and overrides maps which may contain sensitive configuration like
+ * credentials, tokens, or internal endpoints.
+ *
+ *
Note: Since ConfigResponse only has defaults and overrides, ignoring both would result in
+ * an empty object that Jackson can't serialize. Instead, we ignore the entire configResponse
+ * field in AfterGetConfigEvent.
+ */
+ public abstract static class ConfigResponseRedactionMixin {
+ @JsonIgnore
+ public abstract Map defaults();
+
+ @JsonIgnore
+ public abstract Map overrides();
+ }
+
+ /**
+ * Mixin for redacting AfterGetConfigEvent.
+ *
+ * Completely omits the configResponse field which contains sensitive configuration.
+ */
+ public abstract static class AfterGetConfigEventRedactionMixin {
+ @JsonIgnore
+ public abstract Object configResponse();
+ }
+
+ /**
+ * Mixin for redacting CreateNamespaceRequest.
+ *
+ *
Redacts properties that may contain secrets or sensitive configuration.
+ */
+ public abstract static class CreateNamespaceRequestRedactionMixin {
+ @JsonIgnore
+ public abstract Map properties();
+ }
+
+ /**
+ * Mixin for redacting UpdateNamespacePropertiesRequest.
+ *
+ * Redacts property updates and removals that may contain or reference secrets.
+ */
+ public abstract static class UpdateNamespacePropertiesRequestRedactionMixin {
+ @JsonIgnore
+ public abstract Map updates();
+ }
+}
+
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
deleted file mode 100644
index 63311fc035..0000000000
--- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events.jsonEventListener;
-
-import java.util.HashMap;
-import org.apache.polaris.service.events.IcebergRestCatalogEvents;
-import org.apache.polaris.service.events.listeners.PolarisEventListener;
-
-/**
- * This class provides a common framework for transforming Polaris events into a HashMap, which can
- * be used to transform the event further, such as transforming into a JSON string, and send them to
- * various destinations. Concrete implementations should override the
- * {{@code @link#transformAndSendEvent(HashMap)}} method to define how the event data should be
- * transformed into a JSON string, transmitted, and/or stored.
- */
-public abstract class PropertyMapEventListener implements PolarisEventListener {
- protected abstract void transformAndSendEvent(HashMap properties);
-
- @Override
- public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {
- HashMap properties = new HashMap<>();
- properties.put("event_type", event.getClass().getSimpleName());
- properties.put("table_identifier", event.tableIdentifier().toString());
- transformAndSendEvent(properties);
- }
-}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
deleted file mode 100644
index 87cf70a2f1..0000000000
--- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.smallrye.common.annotation.Identifier;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
-import jakarta.ws.rs.core.Context;
-import jakarta.ws.rs.core.SecurityContext;
-import java.time.Clock;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-import org.apache.polaris.core.auth.PolarisPrincipal;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
-import org.apache.polaris.service.events.jsonEventListener.PropertyMapEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
-import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
-import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
-
-@ApplicationScoped
-@Identifier("aws-cloudwatch")
-public class AwsCloudWatchEventListener extends PropertyMapEventListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
- final ObjectMapper objectMapper = new ObjectMapper();
-
- private CloudWatchLogsAsyncClient client;
-
- private final String logGroup;
- private final String logStream;
- private final Region region;
- private final boolean synchronousMode;
- private final Clock clock;
-
- @Inject CallContext callContext;
-
- @Context SecurityContext securityContext;
-
- @Inject
- public AwsCloudWatchEventListener(
- AwsCloudWatchConfiguration config,
- Clock clock,
- PolarisIcebergObjectMapperCustomizer customizer) {
- this.logStream = config.awsCloudWatchLogStream();
- this.logGroup = config.awsCloudWatchLogGroup();
- this.region = Region.of(config.awsCloudWatchRegion());
- this.synchronousMode = config.synchronousMode();
- this.clock = clock;
- customizer.customize(this.objectMapper);
- }
-
- @PostConstruct
- void start() {
- this.client = createCloudWatchAsyncClient();
- ensureLogGroupAndStream();
- }
-
- protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
- return CloudWatchLogsAsyncClient.builder().region(region).build();
- }
-
- private void ensureLogGroupAndStream() {
- ensureResourceExists(
- () ->
- client
- .describeLogGroups(
- DescribeLogGroupsRequest.builder().logGroupNamePrefix(logGroup).build())
- .join()
- .logGroups()
- .stream()
- .anyMatch(g -> g.logGroupName().equals(logGroup)),
- () ->
- client
- .createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build())
- .join(),
- "group",
- logGroup);
- ensureResourceExists(
- () ->
- client
- .describeLogStreams(
- DescribeLogStreamsRequest.builder()
- .logGroupName(logGroup)
- .logStreamNamePrefix(logStream)
- .build())
- .join()
- .logStreams()
- .stream()
- .anyMatch(s -> s.logStreamName().equals(logStream)),
- () ->
- client
- .createLogStream(
- CreateLogStreamRequest.builder()
- .logGroupName(logGroup)
- .logStreamName(logStream)
- .build())
- .join(),
- "stream",
- logStream);
- }
-
- private static void ensureResourceExists(
- Supplier existsCheck,
- Runnable createAction,
- String resourceType,
- String resourceName) {
- if (existsCheck.get()) {
- LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName);
- } else {
- LOGGER.debug("Attempting to create log {}: {}", resourceType, resourceName);
- createAction.run();
- }
- }
-
- @PreDestroy
- void shutdown() {
- if (client != null) {
- client.close();
- client = null;
- }
- }
-
- @Override
- protected void transformAndSendEvent(HashMap properties) {
- properties.put("realm_id", callContext.getRealmContext().getRealmIdentifier());
- properties.put("principal", securityContext.getUserPrincipal().getName());
- properties.put(
- "activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles());
- // TODO: Add request ID when it is available
- String eventAsJson;
- try {
- eventAsJson = objectMapper.writeValueAsString(properties);
- } catch (JsonProcessingException e) {
- LOGGER.error("Error processing event into JSON string: ", e);
- LOGGER.debug("Failed to convert the following object into JSON string: {}", properties);
- return;
- }
- InputLogEvent inputLogEvent =
- InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build();
- PutLogEventsRequest.Builder requestBuilder =
- PutLogEventsRequest.builder()
- .logGroupName(logGroup)
- .logStreamName(logStream)
- .logEvents(List.of(inputLogEvent));
- CompletableFuture future =
- client
- .putLogEvents(requestBuilder.build())
- .whenComplete(
- (resp, err) -> {
- if (err != null) {
- LOGGER.error(
- "Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err);
- }
- });
- if (synchronousMode) {
- future.join();
- }
- }
-}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java
new file mode 100644
index 0000000000..7aaf7604b1
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import org.apache.polaris.service.events.AfterAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
+import org.apache.polaris.service.events.BeforeLimitRequestRateEvent;
+import org.apache.polaris.service.events.CatalogGenericTableServiceEvents;
+import org.apache.polaris.service.events.CatalogPolicyServiceEvents;
+import org.apache.polaris.service.events.CatalogsServiceEvents;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.PrincipalRolesServiceEvents;
+import org.apache.polaris.service.events.PrincipalsServiceEvents;
+
+/**
+ * Base class for event listeners that wish to generically forward all {@link PolarisEvent
+ * PolarisEvents} to an external sink.
+ *
+ * This design follows the Template Method pattern, centralizing shared control flow in the base
+ * class while allowing subclasses to supply the event-specific behavior.
+ */
+public abstract class AllEventsForwardingListener implements PolarisEventListener {
+
+ /** Subclasses implement the actual logic once, generically. */
+ protected abstract void handle(PolarisEvent event);
+
+ /** Optional filter (config-based). Default: handle all. */
+ protected boolean shouldHandle(PolarisEvent event) {
+ return true;
+ }
+
+ @Override
+ public void onAfterGetCatalog(CatalogsServiceEvents.AfterGetCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateCatalog(CatalogsServiceEvents.BeforeCreateCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateCatalog(CatalogsServiceEvents.AfterCreateCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDeleteCatalog(CatalogsServiceEvents.BeforeDeleteCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDeleteCatalog(CatalogsServiceEvents.AfterDeleteCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetCatalog(CatalogsServiceEvents.BeforeGetCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdateCatalog(CatalogsServiceEvents.BeforeUpdateCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdateCatalog(CatalogsServiceEvents.AfterUpdateCatalogEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListCatalogs(CatalogsServiceEvents.BeforeListCatalogsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListCatalogs(CatalogsServiceEvents.AfterListCatalogsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreatePrincipal(PrincipalsServiceEvents.BeforeCreatePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreatePrincipal(PrincipalsServiceEvents.AfterCreatePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDeletePrincipal(PrincipalsServiceEvents.BeforeDeletePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDeletePrincipal(PrincipalsServiceEvents.AfterDeletePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetPrincipal(PrincipalsServiceEvents.BeforeGetPrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterGetPrincipal(PrincipalsServiceEvents.AfterGetPrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdatePrincipal(PrincipalsServiceEvents.BeforeUpdatePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdatePrincipal(PrincipalsServiceEvents.AfterUpdatePrincipalEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRotateCredentials(
+ PrincipalsServiceEvents.BeforeRotateCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRotateCredentials(PrincipalsServiceEvents.AfterRotateCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListPrincipals(PrincipalsServiceEvents.BeforeListPrincipalsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListPrincipals(PrincipalsServiceEvents.AfterListPrincipalsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeResetCredentials(PrincipalsServiceEvents.BeforeResetCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterResetCredentials(PrincipalsServiceEvents.AfterResetCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeAssignPrincipalRole(
+ PrincipalsServiceEvents.BeforeAssignPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterAssignPrincipalRole(
+ PrincipalsServiceEvents.AfterAssignPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRevokePrincipalRole(
+ PrincipalsServiceEvents.BeforeRevokePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRevokePrincipalRole(
+ PrincipalsServiceEvents.AfterRevokePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListAssignedPrincipalRoles(
+ PrincipalsServiceEvents.BeforeListAssignedPrincipalRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListAssignedPrincipalRoles(
+ PrincipalsServiceEvents.AfterListAssignedPrincipalRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreatePrincipalRole(
+ PrincipalRolesServiceEvents.BeforeCreatePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreatePrincipalRole(
+ PrincipalRolesServiceEvents.AfterCreatePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDeletePrincipalRole(
+ PrincipalRolesServiceEvents.BeforeDeletePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDeletePrincipalRole(
+ PrincipalRolesServiceEvents.AfterDeletePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetPrincipalRole(
+ PrincipalRolesServiceEvents.BeforeGetPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterGetPrincipalRole(
+ PrincipalRolesServiceEvents.AfterGetPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdatePrincipalRole(
+ PrincipalRolesServiceEvents.BeforeUpdatePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdatePrincipalRole(
+ PrincipalRolesServiceEvents.AfterUpdatePrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListPrincipalRoles(
+ PrincipalRolesServiceEvents.BeforeListPrincipalRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListPrincipalRoles(
+ PrincipalRolesServiceEvents.AfterListPrincipalRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateCatalogRole(CatalogsServiceEvents.BeforeCreateCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateCatalogRole(CatalogsServiceEvents.AfterCreateCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDeleteCatalogRole(CatalogsServiceEvents.BeforeDeleteCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDeleteCatalogRole(CatalogsServiceEvents.AfterDeleteCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetCatalogRole(CatalogsServiceEvents.BeforeGetCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterGetCatalogRole(CatalogsServiceEvents.AfterGetCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdateCatalogRole(CatalogsServiceEvents.BeforeUpdateCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdateCatalogRole(CatalogsServiceEvents.AfterUpdateCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListCatalogRoles(CatalogsServiceEvents.BeforeListCatalogRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListCatalogRoles(CatalogsServiceEvents.AfterListCatalogRolesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeAssignCatalogRoleToPrincipalRole(
+ PrincipalRolesServiceEvents.BeforeAssignCatalogRoleToPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterAssignCatalogRoleToPrincipalRole(
+ PrincipalRolesServiceEvents.AfterAssignCatalogRoleToPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRevokeCatalogRoleFromPrincipalRole(
+ PrincipalRolesServiceEvents.BeforeRevokeCatalogRoleFromPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRevokeCatalogRoleFromPrincipalRole(
+ PrincipalRolesServiceEvents.AfterRevokeCatalogRoleFromPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListAssigneePrincipalsForPrincipalRole(
+ PrincipalRolesServiceEvents.BeforeListAssigneePrincipalsForPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListAssigneePrincipalsForPrincipalRole(
+ PrincipalRolesServiceEvents.AfterListAssigneePrincipalsForPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListCatalogRolesForPrincipalRole(
+ PrincipalRolesServiceEvents.BeforeListCatalogRolesForPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListCatalogRolesForPrincipalRole(
+ PrincipalRolesServiceEvents.AfterListCatalogRolesForPrincipalRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeAddGrantToCatalogRole(
+ CatalogsServiceEvents.BeforeAddGrantToCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterAddGrantToCatalogRole(
+ CatalogsServiceEvents.AfterAddGrantToCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRevokeGrantFromCatalogRole(
+ CatalogsServiceEvents.BeforeRevokeGrantFromCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRevokeGrantFromCatalogRole(
+ CatalogsServiceEvents.AfterRevokeGrantFromCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListAssigneePrincipalRolesForCatalogRole(
+ CatalogsServiceEvents.BeforeListAssigneePrincipalRolesForCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListAssigneePrincipalRolesForCatalogRole(
+ CatalogsServiceEvents.AfterListAssigneePrincipalRolesForCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListGrantsForCatalogRole(
+ CatalogsServiceEvents.BeforeListGrantsForCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListGrantsForCatalogRole(
+ CatalogsServiceEvents.AfterListGrantsForCatalogRoleEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadNamespaceMetadata(
+ IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadNamespaceMetadata(
+ IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCheckExistsNamespace(
+ IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCheckExistsNamespace(
+ IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdateNamespaceProperties(
+ IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdateNamespaceProperties(
+ IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCheckExistsTable(IcebergRestCatalogEvents.BeforeCheckExistsTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCheckExistsTable(IcebergRestCatalogEvents.AfterCheckExistsTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCheckExistsView(IcebergRestCatalogEvents.BeforeCheckExistsViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCheckExistsView(IcebergRestCatalogEvents.AfterCheckExistsViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCommitTransaction(
+ IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCommitTransaction(IcebergRestCatalogEvents.AfterCommitTransactionEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeSendNotification(IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreatePolicy(CatalogPolicyServiceEvents.BeforeCreatePolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreatePolicy(CatalogPolicyServiceEvents.AfterCreatePolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListPolicies(CatalogPolicyServiceEvents.BeforeListPoliciesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListPolicies(CatalogPolicyServiceEvents.AfterListPoliciesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadPolicy(CatalogPolicyServiceEvents.BeforeLoadPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadPolicy(CatalogPolicyServiceEvents.AfterLoadPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeUpdatePolicy(CatalogPolicyServiceEvents.BeforeUpdatePolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterUpdatePolicy(CatalogPolicyServiceEvents.AfterUpdatePolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDropPolicy(CatalogPolicyServiceEvents.BeforeDropPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDropPolicy(CatalogPolicyServiceEvents.AfterDropPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeAttachPolicy(CatalogPolicyServiceEvents.BeforeAttachPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterAttachPolicy(CatalogPolicyServiceEvents.AfterAttachPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDetachPolicy(CatalogPolicyServiceEvents.BeforeDetachPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDetachPolicy(CatalogPolicyServiceEvents.AfterDetachPolicyEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeGetApplicablePolicies(
+ CatalogPolicyServiceEvents.BeforeGetApplicablePoliciesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterGetApplicablePolicies(
+ CatalogPolicyServiceEvents.AfterGetApplicablePoliciesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeCreateGenericTable(
+ CatalogGenericTableServiceEvents.BeforeCreateGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterCreateGenericTable(
+ CatalogGenericTableServiceEvents.AfterCreateGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeDropGenericTable(
+ CatalogGenericTableServiceEvents.BeforeDropGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterDropGenericTable(
+ CatalogGenericTableServiceEvents.AfterDropGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeListGenericTables(
+ CatalogGenericTableServiceEvents.BeforeListGenericTablesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterListGenericTables(
+ CatalogGenericTableServiceEvents.AfterListGenericTablesEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLoadGenericTable(
+ CatalogGenericTableServiceEvents.BeforeLoadGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterLoadGenericTable(
+ CatalogGenericTableServiceEvents.AfterLoadGenericTableEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onAfterAttemptTask(AfterAttemptTaskEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+
+ @Override
+ public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) {
+ if (shouldHandle(event)) handle(event);
+ }
+}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java
similarity index 63%
rename from runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java
rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java
index e511f13a0c..771f78af03 100644
--- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;
+package org.apache.polaris.service.events.listeners.aws.cloudwatch;
import io.quarkus.runtime.annotations.StaticInitSafe;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;
import jakarta.enterprise.context.ApplicationScoped;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.polaris.service.events.PolarisEvent;
/** Configuration interface for AWS CloudWatch event listener integration. */
@StaticInitSafe
@@ -86,4 +89,54 @@ public interface AwsCloudWatchConfiguration {
@WithName("synchronous-mode")
@WithDefault("false")
boolean synchronousMode();
+
+ @WithName("event-types")
+ Optional>>
+ eventTypes(); // defaults to empty option i.e. process all events
+
+ /**
+ * Returns the redaction mode for sensitive data in events.
+ *
+ * Controls how sensitive information is handled when serializing events to CloudWatch:
+ *
+ *
+ * NONE - No redaction, all data is sent as-is (use with caution)
+ * PARTIAL - Redact highly sensitive fields like credentials, but keep metadata locations
+ * and properties
+ * FULL - Redact all potentially sensitive fields including locations, properties, and
+ * credentials
+ *
+ *
+ * Configuration property: {@code polaris.event-listener.aws-cloudwatch.redaction-mode}
+ *
+ * @return the redaction mode, defaults to PARTIAL for security
+ */
+ @WithName("redaction-mode")
+ @WithDefault("PARTIAL")
+ RedactionMode redactionMode();
+
+ /**
+ * Returns additional field names to redact beyond the defaults.
+ *
+ *
Allows customization of which fields should be redacted. Field names can use dot notation
+ * for nested fields (e.g., "properties.custom-secret"). These fields are redacted in addition to
+ * the default redacted fields based on the redaction mode.
+ *
+ *
Configuration property: {@code
+ * polaris.event-listener.aws-cloudwatch.additional-redacted-fields}
+ *
+ * @return a set of additional field names to redact, empty by default
+ */
+ @WithName("additional-redacted-fields")
+ Optional> additionalRedactedFields();
+
+ /** Enum defining redaction modes for sensitive data. */
+ enum RedactionMode {
+ /** No redaction - all data sent as-is. Use with extreme caution. */
+ NONE,
+ /** Partial redaction - redact credentials and secrets, keep metadata. */
+ PARTIAL,
+ /** Full redaction - redact all potentially sensitive fields. */
+ FULL
+ }
}
diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java
new file mode 100644
index 0000000000..3979838d40
--- /dev/null
+++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.aws.cloudwatch;
+
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.json.FieldRedactionSerializerModifier;
+import org.apache.polaris.service.events.json.mixins.RedactionMixins;
+import org.apache.polaris.service.events.listeners.AllEventsForwardingListener;
+import org.apache.polaris.service.events.listeners.aws.cloudwatch.AwsCloudWatchConfiguration.RedactionMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends AllEventsForwardingListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+
+ final ObjectMapper objectMapper;
+ private CloudWatchLogsAsyncClient client;
+
+ private final String logGroup;
+ private final String logStream;
+ private final Region region;
+ private final boolean synchronousMode;
+ private final Clock clock;
+ private final Set> allowedEventTypes;
+ private final boolean listenToAllEvents;
+
+ @Inject CallContext callContext;
+
+ @Context SecurityContext securityContext;
+
+ @Inject
+ public AwsCloudWatchEventListener(
+ AwsCloudWatchConfiguration config, Clock clock, ObjectMapper mapper) {
+ this.logStream = config.awsCloudWatchLogStream();
+ this.logGroup = config.awsCloudWatchLogGroup();
+ this.region = Region.of(config.awsCloudWatchRegion());
+ this.synchronousMode = config.synchronousMode();
+ this.clock = clock;
+ this.objectMapper =
+ configureRedaction(
+ mapper.copy(),
+ config.redactionMode(),
+ config.additionalRedactedFields().orElse(Set.of()));
+ this.allowedEventTypes = config.eventTypes().orElse(Set.of());
+ this.listenToAllEvents =
+ allowedEventTypes.isEmpty()
+ || allowedEventTypes.stream().anyMatch(c -> c == PolarisEvent.class);
+ }
+
+ /**
+ * Configures the ObjectMapper with redaction mixins based on the specified redaction mode.
+ *
+ * @param mapper the ObjectMapper to configure (should be a copy)
+ * @param redactionMode the redaction mode to apply
+ * @param additionalRedactedFields additional field names to redact beyond the defaults
+ * @return the configured ObjectMapper
+ */
+ private static ObjectMapper configureRedaction(
+ ObjectMapper mapper, RedactionMode redactionMode, Set additionalRedactedFields) {
+ LOGGER.debug("Configuring redaction with mode: {}", redactionMode);
+ LOGGER.debug("Additional redacted fields: {}", additionalRedactedFields);
+
+ if (redactionMode == RedactionMode.NONE && additionalRedactedFields.isEmpty()) {
+ // No redaction - return mapper as-is
+ return mapper;
+ }
+
+ // Apply field-level redaction patterns if configured
+ if (!additionalRedactedFields.isEmpty()) {
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(new FieldRedactionSerializerModifier(additionalRedactedFields));
+ mapper.registerModule(module);
+ LOGGER.debug("Registered field-level redaction for {} patterns", additionalRedactedFields.size());
+ }
+
+ // Skip mixin-based redaction if mode is NONE
+ if (redactionMode == RedactionMode.NONE) {
+ return mapper;
+ }
+
+ // Always redact these highly sensitive types regardless of mode
+ mapper.addMixIn(LoadTableResponse.class, RedactionMixins.LoadTableResponseRedactionMixin.class);
+ mapper.addMixIn(LoadViewResponse.class, RedactionMixins.LoadViewResponseRedactionMixin.class);
+ // Redact the entire AfterGetConfigEvent.configResponse field
+ mapper.addMixIn(
+ IcebergRestCatalogEvents.AfterGetConfigEvent.class,
+ RedactionMixins.AfterGetConfigEventRedactionMixin.class);
+ LOGGER.debug("Registered AfterGetConfigEvent mixin");
+ mapper.addMixIn(
+ CreateNamespaceRequest.class, RedactionMixins.CreateNamespaceRequestRedactionMixin.class);
+ mapper.addMixIn(
+ UpdateNamespacePropertiesRequest.class,
+ RedactionMixins.UpdateNamespacePropertiesRequestRedactionMixin.class);
+
+ if (redactionMode == RedactionMode.PARTIAL) {
+ // Partial redaction - keep locations, redact properties
+ mapper.addMixIn(
+ TableMetadata.class, RedactionMixins.TableMetadataPartialRedactionMixin.class);
+ mapper.addMixIn(ViewMetadata.class, RedactionMixins.ViewMetadataPartialRedactionMixin.class);
+ } else if (redactionMode == RedactionMode.FULL) {
+ // Full redaction - redact both locations and properties
+ mapper.addMixIn(TableMetadata.class, RedactionMixins.TableMetadataFullRedactionMixin.class);
+ mapper.addMixIn(ViewMetadata.class, RedactionMixins.ViewMetadataFullRedactionMixin.class);
+ }
+
+ return mapper;
+ }
+
+ @Override
+ protected boolean shouldHandle(PolarisEvent event) {
+ if (event == null) {
+ return false;
+ }
+
+ if (this.listenToAllEvents) {
+ return true;
+ }
+ Class extends PolarisEvent> actualType = event.getClass();
+ return allowedEventTypes.stream().anyMatch(cfg -> cfg.isAssignableFrom(actualType));
+ }
+
+ @Override
+ protected void handle(PolarisEvent event) {
+ transformAndSendEvent(event);
+ }
+
+ @PostConstruct
+ void start() {
+ this.client = createCloudWatchAsyncClient();
+ ensureLogGroupAndStream();
+ }
+
+ protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
+ return CloudWatchLogsAsyncClient.builder().region(region).build();
+ }
+
+ private void ensureLogGroupAndStream() {
+ ensureResourceExists(
+ () ->
+ client
+ .describeLogGroups(
+ DescribeLogGroupsRequest.builder().logGroupNamePrefix(logGroup).build())
+ .join()
+ .logGroups()
+ .stream()
+ .anyMatch(g -> g.logGroupName().equals(logGroup)),
+ () ->
+ client
+ .createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build())
+ .join(),
+ "group",
+ logGroup);
+ ensureResourceExists(
+ () ->
+ client
+ .describeLogStreams(
+ DescribeLogStreamsRequest.builder()
+ .logGroupName(logGroup)
+ .logStreamNamePrefix(logStream)
+ .build())
+ .join()
+ .logStreams()
+ .stream()
+ .anyMatch(s -> s.logStreamName().equals(logStream)),
+ () ->
+ client
+ .createLogStream(
+ CreateLogStreamRequest.builder()
+ .logGroupName(logGroup)
+ .logStreamName(logStream)
+ .build())
+ .join(),
+ "stream",
+ logStream);
+ }
+
+ private static void ensureResourceExists(
+ Supplier existsCheck,
+ Runnable createAction,
+ String resourceType,
+ String resourceName) {
+ if (existsCheck.get()) {
+ LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName);
+ } else {
+ LOGGER.debug("Attempting to create log {}: {}", resourceType, resourceName);
+ createAction.run();
+ }
+ }
+
+ @PreDestroy
+ void shutdown() {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+ @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
+ public record CloudWatchEvent(
+ String principal,
+ String realmId,
+ Collection activatedRoles,
+ String eventType,
+ @JsonUnwrapped PolarisEvent event // flatten
+ ) {}
+
+ protected void transformAndSendEvent(PolarisEvent event) {
+
+ CloudWatchEvent payload =
+ new CloudWatchEvent(
+ securityContext.getUserPrincipal().getName(),
+ callContext.getRealmContext().getRealmIdentifier(),
+ ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles(),
+ event.getClass().getSimpleName(),
+ event);
+
+ String eventAsJson;
+
+ try {
+ eventAsJson = objectMapper.writeValueAsString(payload);
+ } catch (JsonProcessingException ex) {
+ LOGGER.error("Error serializing CloudWatch payload: ", ex);
+ LOGGER.debug("Failed to convert the following object into JSON string: {}", payload);
+ return;
+ }
+
+ InputLogEvent inputLogEvent =
+ InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build();
+
+ PutLogEventsRequest.Builder requestBuilder =
+ PutLogEventsRequest.builder()
+ .logGroupName(logGroup)
+ .logStreamName(logStream)
+ .logEvents(List.of(inputLogEvent));
+
+ CompletableFuture future =
+ client
+ .putLogEvents(requestBuilder.build())
+ .whenComplete(
+ (resp, err) -> {
+ if (err != null) {
+ LOGGER.error(
+ "Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err);
+ }
+ });
+
+ if (synchronousMode) {
+ future.join();
+ }
+ }
+}
diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java
new file mode 100644
index 0000000000..cee236a043
--- /dev/null
+++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+class FieldRedactionSerializerModifierTest {
+
+ private static final String REDACTED_MARKER = RedactingSerializer.getRedactedMarker();
+
+ static class TestBean {
+ public String password;
+ public String username;
+ public String apiKey;
+ public String publicData;
+
+ public TestBean(String password, String username, String apiKey, String publicData) {
+ this.password = password;
+ this.username = username;
+ this.apiKey = apiKey;
+ this.publicData = publicData;
+ }
+ }
+
+ @Test
+ void shouldRedactExactFieldNames() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(
+ new FieldRedactionSerializerModifier(Set.of("password", "apiKey")));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("password").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("username").asText()).isEqualTo("john_doe");
+ assertThat(node.get("publicData").asText()).isEqualTo("public info");
+ }
+
+ @Test
+ void shouldRedactFieldsWithWildcardPrefix() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("*Key")));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("password").asText()).isEqualTo("secret123");
+ assertThat(node.get("username").asText()).isEqualTo("john_doe");
+ assertThat(node.get("publicData").asText()).isEqualTo("public info");
+ }
+
+ @Test
+ void shouldRedactFieldsWithWildcardSuffix() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("api*")));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("password").asText()).isEqualTo("secret123");
+ assertThat(node.get("username").asText()).isEqualTo("john_doe");
+ assertThat(node.get("publicData").asText()).isEqualTo("public info");
+ }
+
+ @Test
+ void shouldRedactFieldsWithMultiplePatterns() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(
+ new FieldRedactionSerializerModifier(Set.of("*word", "api*", "username")));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("password").asText()).isEqualTo(REDACTED_MARKER); // matches *word
+ assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER); // matches api*
+ assertThat(node.get("username").asText()).isEqualTo(REDACTED_MARKER); // exact match
+ assertThat(node.get("publicData").asText()).isEqualTo("public info");
+ }
+
+ @Test
+ void shouldNotRedactWhenNoPatterns() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of()));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("password").asText()).isEqualTo("secret123");
+ assertThat(node.get("apiKey").asText()).isEqualTo("key-12345");
+ assertThat(node.get("username").asText()).isEqualTo("john_doe");
+ assertThat(node.get("publicData").asText()).isEqualTo("public info");
+ }
+
+ @Test
+ void shouldHandleWildcardInMiddle() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("*Data")));
+ mapper.registerModule(module);
+
+ TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info");
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("publicData").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("password").asText()).isEqualTo("secret123");
+ assertThat(node.get("apiKey").asText()).isEqualTo("key-12345");
+ assertThat(node.get("username").asText()).isEqualTo("john_doe");
+ }
+}
+
diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java
new file mode 100644
index 0000000000..ead921a12d
--- /dev/null
+++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.junit.jupiter.api.Test;
+
+class RedactingSerializerTest {
+
+ private static final String REDACTED_MARKER = RedactingSerializer.getRedactedMarker();
+
+ static class TestBean {
+ @JsonSerialize(using = RedactingSerializer.class)
+ public String sensitiveField;
+
+ public String normalField;
+
+ public TestBean(String sensitiveField, String normalField) {
+ this.sensitiveField = sensitiveField;
+ this.normalField = normalField;
+ }
+ }
+
+ @Test
+ void shouldRedactAnnotatedField() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TestBean bean = new TestBean("secret-value", "public-value");
+
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("normalField").asText()).isEqualTo("public-value");
+ }
+
+ @Test
+ void shouldRedactNullValues() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TestBean bean = new TestBean(null, "public-value");
+
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ // Note: Jackson doesn't call custom serializers for null values by default
+ // The field will be serialized as null unless we configure the mapper differently
+ assertThat(node.get("sensitiveField").isNull()).isTrue();
+ assertThat(node.get("normalField").asText()).isEqualTo("public-value");
+ }
+
+ @Test
+ void shouldRedactNumericValues() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+
+ class NumericBean {
+ @JsonSerialize(using = RedactingSerializer.class)
+ public Integer sensitiveNumber;
+
+ public NumericBean(Integer sensitiveNumber) {
+ this.sensitiveNumber = sensitiveNumber;
+ }
+ }
+
+ NumericBean bean = new NumericBean(12345);
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("sensitiveNumber").asText()).isEqualTo(REDACTED_MARKER);
+ }
+
+ @Test
+ void shouldRedactComplexObjects() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+
+ class ComplexBean {
+ @JsonSerialize(using = RedactingSerializer.class)
+ public Object complexObject;
+
+ public ComplexBean(Object complexObject) {
+ this.complexObject = complexObject;
+ }
+ }
+
+ ComplexBean bean = new ComplexBean(new TestBean("nested-secret", "nested-public"));
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("complexObject").asText()).isEqualTo(REDACTED_MARKER);
+ }
+
+ @Test
+ void shouldReturnCorrectRedactedMarker() {
+ assertThat(RedactingSerializer.getRedactedMarker()).isEqualTo("***REDACTED***");
+ }
+
+ @Test
+ void shouldRedactEmptyStrings() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TestBean bean = new TestBean("", "public-value");
+
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(node.get("normalField").asText()).isEqualTo("public-value");
+ }
+
+ @Test
+ void shouldRedactLongStrings() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String longSecret = "a".repeat(10000);
+ TestBean bean = new TestBean(longSecret, "public-value");
+
+ String json = mapper.writeValueAsString(bean);
+ JsonNode node = mapper.readTree(json);
+
+ assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER);
+ assertThat(json.length()).isLessThan(longSecret.length());
+ }
+}
+
diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java
new file mode 100644
index 0000000000..241b2e78e7
--- /dev/null
+++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.json.mixins;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for RedactionMixins.
+ *
+ * This test verifies that all the mixin classes exist and can be referenced. The actual
+ * redaction behavior is tested in the integration tests (AwsCloudWatchEventListenerTest) where the
+ * mixins are applied to real objects and serialized.
+ */
+class RedactionMixinsTest {
+
+ @Test
+ void shouldVerifyTableMetadataMixinsExist() {
+ // Verify that the TableMetadata mixins exist
+ assertThat(RedactionMixins.TableMetadataPartialRedactionMixin.class).isNotNull();
+ assertThat(RedactionMixins.TableMetadataFullRedactionMixin.class).isNotNull();
+ }
+
+ @Test
+ void shouldVerifyViewMetadataMixinsExist() {
+ // Verify that the ViewMetadata mixins exist
+ assertThat(RedactionMixins.ViewMetadataPartialRedactionMixin.class).isNotNull();
+ assertThat(RedactionMixins.ViewMetadataFullRedactionMixin.class).isNotNull();
+ }
+
+ @Test
+ void shouldVerifyResponseMixinsExist() {
+ // Verify that the response mixins exist
+ assertThat(RedactionMixins.LoadTableResponseRedactionMixin.class).isNotNull();
+ assertThat(RedactionMixins.LoadViewResponseRedactionMixin.class).isNotNull();
+ assertThat(RedactionMixins.ConfigResponseRedactionMixin.class).isNotNull();
+ }
+
+ @Test
+ void shouldVerifyEventMixinsExist() {
+ // Verify that the event mixins exist
+ assertThat(RedactionMixins.AfterGetConfigEventRedactionMixin.class).isNotNull();
+ }
+
+ @Test
+ void shouldVerifyRequestMixinsExist() {
+ // Verify that the request mixins exist
+ assertThat(RedactionMixins.CreateNamespaceRequestRedactionMixin.class).isNotNull();
+ assertThat(RedactionMixins.UpdateNamespacePropertiesRequestRedactionMixin.class).isNotNull();
+ }
+}
+
diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
deleted file mode 100644
index 3aac097b17..0000000000
--- a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;
-
-import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import io.quarkus.runtime.configuration.MemorySize;
-import jakarta.ws.rs.core.SecurityContext;
-import java.math.BigInteger;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.auth.PolarisPrincipal;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
-import org.apache.polaris.service.events.IcebergRestCatalogEvents;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.localstack.LocalStackContainer;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
-import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
-import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
-import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
-
-class AwsCloudWatchEventListenerTest {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(AwsCloudWatchEventListenerTest.class);
-
- private static final LocalStackContainer localStack =
- new LocalStackContainer(
- containerSpecHelper("localstack", AwsCloudWatchEventListenerTest.class)
- .dockerImageName(null))
- .withServices("logs");
-
- private static final String LOG_GROUP = "test-log-group";
- private static final String LOG_STREAM = "test-log-stream";
- private static final String REALM = "test-realm";
- private static final String TEST_USER = "test-user";
- private static final Clock clock = Clock.systemUTC();
- private static final BigInteger MAX_BODY_SIZE = BigInteger.valueOf(1024 * 1024);
- private static final PolarisIcebergObjectMapperCustomizer customizer =
- new PolarisIcebergObjectMapperCustomizer(new MemorySize(MAX_BODY_SIZE));
-
- @Mock private AwsCloudWatchConfiguration config;
-
- private ExecutorService executorService;
- private AutoCloseable mockitoContext;
-
- @BeforeEach
- void setUp() {
- mockitoContext = MockitoAnnotations.openMocks(this);
- executorService = Executors.newSingleThreadExecutor();
-
- // Configure the mocks
- when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP);
- when(config.awsCloudWatchLogStream()).thenReturn(LOG_STREAM);
- when(config.awsCloudWatchRegion()).thenReturn("us-east-1");
- when(config.synchronousMode()).thenReturn(false); // Default to async mode
- }
-
- @AfterEach
- void tearDown() throws Exception {
- if (mockitoContext != null) {
- mockitoContext.close();
- }
- if (executorService != null) {
- executorService.shutdownNow();
- if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
- LOGGER.warn("ExecutorService did not terminate in time");
- }
- }
- if (localStack.isRunning()) {
- localStack.stop();
- }
- }
-
- private CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
- if (!localStack.isRunning()) {
- localStack.start();
- }
- return CloudWatchLogsAsyncClient.builder()
- .endpointOverride(localStack.getEndpoint())
- .credentialsProvider(
- StaticCredentialsProvider.create(
- AwsBasicCredentials.create(localStack.getAccessKey(), localStack.getSecretKey())))
- .region(Region.of(localStack.getRegion()))
- .build();
- }
-
- private AwsCloudWatchEventListener createListener(CloudWatchLogsAsyncClient client) {
- AwsCloudWatchEventListener listener =
- new AwsCloudWatchEventListener(config, clock, customizer) {
- @Override
- protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
- return client;
- }
- };
-
- // Set up call context and security context
- CallContext callContext = Mockito.mock(CallContext.class);
- PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class);
- RealmContext realmContext = Mockito.mock(RealmContext.class);
- SecurityContext securityContext = Mockito.mock(SecurityContext.class);
- PolarisPrincipal principal = Mockito.mock(PolarisPrincipal.class);
- when(callContext.getRealmContext()).thenReturn(realmContext);
- when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
- when(realmContext.getRealmIdentifier()).thenReturn(REALM);
- when(securityContext.getUserPrincipal()).thenReturn(principal);
- when(principal.getName()).thenReturn(TEST_USER);
- when(principal.getRoles()).thenReturn(Set.of("role1", "role2"));
- listener.callContext = callContext;
- listener.securityContext = securityContext;
-
- return listener;
- }
-
- @Test
- void shouldCreateLogGroupAndStream() {
- CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
- AwsCloudWatchEventListener listener = createListener(client);
-
- // Start the listener which should create the log group and stream
- listener.start();
-
- try {
- verifyLogGroupAndStreamExist(client);
- } finally {
- client.close();
- listener.shutdown();
- }
- }
-
- @Test
- void shouldAcceptPreviouslyCreatedLogGroupAndStream() {
- CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
- client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(LOG_GROUP).build()).join();
- client
- .createLogStream(
- CreateLogStreamRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamName(LOG_STREAM)
- .build())
- .join();
- verifyLogGroupAndStreamExist(client);
-
- AwsCloudWatchEventListener listener = createListener(client);
- listener.start();
- try {
- verifyLogGroupAndStreamExist(client);
- } finally {
- client.close();
- listener.shutdown();
- }
- }
-
- @Test
- void shouldSendEventToCloudWatch() {
- CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
- AwsCloudWatchEventListener listener = createListener(client);
- listener.start();
- try {
- // Create and send a test event
- TableIdentifier testTable = TableIdentifier.of("test_namespace", "test_table");
- listener.onAfterRefreshTable(
- new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable));
-
- Awaitility.await("expected amount of records should be sent to CloudWatch")
- .atMost(Duration.ofSeconds(30))
- .pollDelay(Duration.ofMillis(100))
- .pollInterval(Duration.ofMillis(100))
- .untilAsserted(
- () -> {
- GetLogEventsResponse resp =
- client
- .getLogEvents(
- GetLogEventsRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamName(LOG_STREAM)
- .build())
- .join();
- assertThat(resp.events().size()).isGreaterThan(0);
- });
- GetLogEventsResponse logEvents =
- client
- .getLogEvents(
- GetLogEventsRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamName(LOG_STREAM)
- .build())
- .join();
-
- assertThat(logEvents.events())
- .hasSize(1)
- .first()
- .satisfies(
- logEvent -> {
- String message = logEvent.message();
- assertThat(message).contains(REALM);
- assertThat(message)
- .contains(
- IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName());
- assertThat(message).contains(TEST_USER);
- assertThat(message).contains(testTable.toString());
- });
- } finally {
- // Clean up
- listener.shutdown();
- client.close();
- }
- }
-
- @Test
- void shouldSendEventInSynchronousMode() {
- CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
-
- // Test synchronous mode
- when(config.synchronousMode()).thenReturn(true);
- AwsCloudWatchEventListener syncListener = createListener(client);
- syncListener.start();
- try {
- // Create and send a test event synchronously
- TableIdentifier syncTestTable = TableIdentifier.of("test_namespace", "test_table_sync");
- syncListener.onAfterRefreshTable(
- new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", syncTestTable));
-
- Awaitility.await("expected amount of records should be sent to CloudWatch")
- .atMost(Duration.ofSeconds(30))
- .pollDelay(Duration.ofMillis(100))
- .pollInterval(Duration.ofMillis(100))
- .untilAsserted(
- () -> {
- GetLogEventsResponse resp =
- client
- .getLogEvents(
- GetLogEventsRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamName(LOG_STREAM)
- .build())
- .join();
- assertThat(resp.events().size()).isGreaterThan(0);
- });
-
- GetLogEventsResponse logEvents =
- client
- .getLogEvents(
- GetLogEventsRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamName(LOG_STREAM)
- .build())
- .join();
-
- assertThat(logEvents.events()).hasSize(1);
-
- // Verify sync event
- assertThat(logEvents.events())
- .anySatisfy(
- logEvent -> {
- String message = logEvent.message();
- assertThat(message).contains("test_table_sync");
- assertThat(message).contains("AfterRefreshTableEvent");
- });
- } finally {
- // Clean up
- syncListener.shutdown();
- client.close();
- }
- }
-
- @Test
- void ensureObjectMapperCustomizerIsApplied() {
- AwsCloudWatchEventListener listener = createListener(createCloudWatchAsyncClient());
- listener.start();
-
- assertThat(listener.objectMapper.getPropertyNamingStrategy())
- .isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class);
- assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength())
- .isEqualTo(MAX_BODY_SIZE.longValue());
- }
-
- private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) {
- // Verify log group exists
- DescribeLogGroupsResponse groups =
- client
- .describeLogGroups(
- DescribeLogGroupsRequest.builder().logGroupNamePrefix(LOG_GROUP).build())
- .join();
- assertThat(groups.logGroups())
- .hasSize(1)
- .first()
- .satisfies(group -> assertThat(group.logGroupName()).isEqualTo(LOG_GROUP));
-
- // Verify log stream exists
- DescribeLogStreamsResponse streams =
- client
- .describeLogStreams(
- DescribeLogStreamsRequest.builder()
- .logGroupName(LOG_GROUP)
- .logStreamNamePrefix(LOG_STREAM)
- .build())
- .join();
- assertThat(streams.logStreams())
- .hasSize(1)
- .first()
- .satisfies(stream -> assertThat(stream.logStreamName()).isEqualTo(LOG_STREAM));
- }
-}
diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
new file mode 100644
index 0000000000..4b47f203e9
--- /dev/null
+++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
@@ -0,0 +1,748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.aws.cloudwatch;
+
+import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import io.quarkus.runtime.configuration.MemorySize;
+import jakarta.ws.rs.core.SecurityContext;
+import java.math.BigInteger;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
+import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.json.mixins.IcebergMixins;
+import org.apache.polaris.service.events.json.mixins.PolarisEventBaseMixin;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.localstack.LocalStackContainer;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
+
+class AwsCloudWatchEventListenerTest {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(AwsCloudWatchEventListenerTest.class);
+
+ private static final LocalStackContainer localStack =
+ new LocalStackContainer(
+ containerSpecHelper("localstack", AwsCloudWatchEventListenerTest.class)
+ .dockerImageName(null))
+ .withServices("logs");
+
+ private static final String LOG_GROUP = "test-log-group";
+ private static final String LOG_STREAM = "test-log-stream";
+ private static final String REALM = "test-realm";
+ private static final String TEST_USER = "test-user";
+ private static final Clock clock = Clock.systemUTC();
+ private static final BigInteger MAX_BODY_SIZE = BigInteger.valueOf(1024 * 1024);
+ private static final PolarisIcebergObjectMapperCustomizer customizer =
+ new PolarisIcebergObjectMapperCustomizer(new MemorySize(MAX_BODY_SIZE));
+
+ @Mock private AwsCloudWatchConfiguration config;
+
+ private ExecutorService executorService;
+ private AutoCloseable mockitoContext;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ ;
+
+ @BeforeEach
+ void setUp() {
+ mockitoContext = MockitoAnnotations.openMocks(this);
+ executorService = Executors.newSingleThreadExecutor();
+
+ customizer.customize(objectMapper);
+
+ // Configure the mocks
+ when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP);
+ when(config.awsCloudWatchLogStream()).thenReturn(LOG_STREAM);
+ when(config.awsCloudWatchRegion()).thenReturn("us-east-1");
+ when(config.synchronousMode()).thenReturn(false); // default async
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty()); // handle all events
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (mockitoContext != null) {
+ mockitoContext.close();
+ }
+ if (executorService != null) {
+ executorService.shutdownNow();
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOGGER.warn("ExecutorService did not terminate in time");
+ }
+ }
+ if (localStack.isRunning()) {
+ localStack.stop();
+ }
+ }
+
+ private CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
+ if (!localStack.isRunning()) {
+ localStack.start();
+ }
+ return CloudWatchLogsAsyncClient.builder()
+ .endpointOverride(localStack.getEndpoint())
+ .credentialsProvider(
+ StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(localStack.getAccessKey(), localStack.getSecretKey())))
+ .region(Region.of(localStack.getRegion()))
+ .build();
+ }
+
+ private AwsCloudWatchEventListener createListener(CloudWatchLogsAsyncClient client) {
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper) {
+ @Override
+ protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
+ return client;
+ }
+ };
+
+ // Set up call context and security context
+ CallContext callContext = Mockito.mock(CallContext.class);
+ PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class);
+ RealmContext realmContext = Mockito.mock(RealmContext.class);
+ SecurityContext securityContext = Mockito.mock(SecurityContext.class);
+ PolarisPrincipal principal = Mockito.mock(PolarisPrincipal.class);
+ when(callContext.getRealmContext()).thenReturn(realmContext);
+ when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+ when(realmContext.getRealmIdentifier()).thenReturn(REALM);
+ when(securityContext.getUserPrincipal()).thenReturn(principal);
+ when(principal.getName()).thenReturn(TEST_USER);
+ when(principal.getRoles()).thenReturn(Set.of("role1", "role2"));
+ listener.callContext = callContext;
+ listener.securityContext = securityContext;
+
+ return listener;
+ }
+
+ @Test
+ void shouldCreateLogGroupAndStream() {
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+
+ // Start the listener which should create the log group and stream
+ listener.start();
+
+ try {
+ verifyLogGroupAndStreamExist(client);
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldAcceptPreviouslyCreatedLogGroupAndStream() {
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(LOG_GROUP).build()).join();
+ client
+ .createLogStream(
+ CreateLogStreamRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ verifyLogGroupAndStreamExist(client);
+
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+ try {
+ verifyLogGroupAndStreamExist(client);
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldSendEventToCloudWatch() {
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+ try {
+ // Create and send a test event
+ Namespace namespaceTest = Namespace.of("test_namespace.test1", "test1a");
+ TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table");
+ listener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable));
+
+ Awaitility.await("expected amount of records should be sent to CloudWatch")
+ .atMost(Duration.ofSeconds(30))
+ .pollDelay(Duration.ofMillis(100))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(
+ () -> {
+ GetLogEventsResponse resp =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ assertThat(resp.events().size()).isGreaterThan(0);
+ });
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events())
+ .hasSize(1)
+ .first()
+ .satisfies(
+ logEvent -> {
+ String message = logEvent.message();
+ JsonNode root = objectMapper.readTree(message);
+ JsonNode event = root.path("event").isMissingNode() ? root : root.path("event");
+ assertThat(message).contains(REALM);
+ assertThat(message)
+ .contains(
+ IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName());
+ assertThat(message).contains(TEST_USER);
+ // table_identifier object
+ JsonNode tableId = event.path("table_identifier");
+ assertThat(tableId.isObject()).isTrue();
+ assertThat(tableId.path("name").asText()).isEqualTo("test_table");
+ assertThat(tableId.path("namespace").isArray()).isTrue();
+ });
+ } finally {
+ // Clean up
+ listener.shutdown();
+ client.close();
+ }
+ }
+
+ @Test
+ void shouldSendEventInSynchronousMode() {
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+
+ // Test synchronous mode
+ when(config.synchronousMode()).thenReturn(true);
+ AwsCloudWatchEventListener syncListener = createListener(client);
+ syncListener.start();
+ try {
+ // Create and send a test event synchronously
+ TableIdentifier syncTestTable = TableIdentifier.of("test_namespace", "test_table_sync");
+ syncListener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", syncTestTable));
+
+ Awaitility.await("expected amount of records should be sent to CloudWatch")
+ .atMost(Duration.ofSeconds(30))
+ .pollDelay(Duration.ofMillis(100))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(
+ () -> {
+ GetLogEventsResponse resp =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ assertThat(resp.events().size()).isGreaterThan(0);
+ });
+
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+
+ // Verify sync event
+ assertThat(logEvents.events())
+ .anySatisfy(
+ logEvent -> {
+ String message = logEvent.message();
+ assertThat(message).contains("test_table_sync");
+ assertThat(message).contains("AfterRefreshTableEvent");
+ });
+ } finally {
+ // Clean up
+ syncListener.shutdown();
+ client.close();
+ }
+ }
+
+ @Test
+ void ensureObjectMapperCustomizerIsApplied() {
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
+
+ assertThat(listener.objectMapper.getPropertyNamingStrategy())
+ .isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class);
+ assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength())
+ .isEqualTo(MAX_BODY_SIZE.longValue());
+
+ assertThat(objectMapper.findMixInClassFor(Namespace.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(IcebergMixins.NamespaceMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(TableIdentifier.class))
+ .as("TableIdentifier mixin should be registered")
+ .isEqualTo(IcebergMixins.TableIdentifierMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(PolarisEvent.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(PolarisEventBaseMixin.class);
+ }
+
+ @Test
+ void shouldListenToAllEventTypesWhenConfigNotProvided() {
+ // given: config.eventTypes() is empty → listen to all events
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty());
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
+
+ // This is any random PolarisEvent — if the listener listens to all types,
+ // shouldHandle(event) should return true
+ PolarisEvent randomEvent =
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent(
+ "test_catalog", TableIdentifier.of("db", "table"));
+
+ boolean shouldHandle = listener.shouldHandle(randomEvent);
+ assertThat(shouldHandle)
+ .as("Listener should handle all events when no eventTypes are configured")
+ .isTrue();
+ }
+
+ private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) {
+ DescribeLogGroupsResponse groups =
+ client
+ .describeLogGroups(
+ DescribeLogGroupsRequest.builder().logGroupNamePrefix(LOG_GROUP).build())
+ .join();
+ assertThat(groups.logGroups())
+ .hasSize(1)
+ .first()
+ .satisfies(group -> assertThat(group.logGroupName()).isEqualTo(LOG_GROUP));
+
+ // Verify log stream exists
+ DescribeLogStreamsResponse streams =
+ client
+ .describeLogStreams(
+ DescribeLogStreamsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamNamePrefix(LOG_STREAM)
+ .build())
+ .join();
+ assertThat(streams.logStreams())
+ .hasSize(1)
+ .first()
+ .satisfies(stream -> assertThat(stream.logStreamName()).isEqualTo(LOG_STREAM));
+ }
+
+ @Test
+ void shouldRedactConfigResponseDirectly() throws Exception {
+ // Test that the mixin works when serializing ConfigResponse directly
+ ObjectMapper testMapper = new ObjectMapper();
+ testMapper.addMixIn(
+ org.apache.iceberg.rest.responses.ConfigResponse.class,
+ org.apache.polaris.service.events.json.mixins.RedactionMixins
+ .ConfigResponseRedactionMixin.class);
+
+ org.apache.iceberg.rest.responses.ConfigResponse configResponse =
+ org.apache.iceberg.rest.responses.ConfigResponse.builder()
+ .withDefault("s3.access-key-id", "AKIAIOSFODNN7EXAMPLE")
+ .withDefault("s3.secret-access-key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .withDefault("token", "some-bearer-token")
+ .withOverride("prefix", "test-catalog")
+ .build();
+
+ String json = testMapper.writeValueAsString(configResponse);
+ System.out.println("Direct ConfigResponse JSON: " + json);
+
+ // Verify that defaults and overrides are not in the JSON
+ assertThat(json).doesNotContain("defaults");
+ assertThat(json).doesNotContain("overrides");
+ assertThat(json).doesNotContain("AKIAIOSFODNN7EXAMPLE");
+ }
+
+ @Test
+ void shouldRedactSensitiveFieldsInPartialMode() throws Exception {
+ // Configure for PARTIAL redaction mode
+ when(config.redactionMode())
+ .thenReturn(AwsCloudWatchConfiguration.RedactionMode.PARTIAL);
+
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+
+ try {
+ // Create an event with sensitive data
+ org.apache.iceberg.rest.responses.ConfigResponse configResponse =
+ org.apache.iceberg.rest.responses.ConfigResponse.builder()
+ .withDefault("s3.access-key-id", "AKIAIOSFODNN7EXAMPLE")
+ .withDefault("s3.secret-access-key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .withDefault("token", "some-bearer-token")
+ .withOverride("prefix", "test-catalog")
+ .build();
+
+ listener.onAfterGetConfig(
+ new IcebergRestCatalogEvents.AfterGetConfigEvent(configResponse));
+
+ // Wait for event to be sent
+ Awaitility.await("event should be sent to CloudWatch")
+ .atMost(Duration.ofSeconds(30))
+ .pollDelay(Duration.ofMillis(100))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(
+ () -> {
+ GetLogEventsResponse resp =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ assertThat(resp.events().size()).isGreaterThan(0);
+ });
+
+ // Retrieve and verify the logged event
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+ String message = logEvents.events().get(0).message();
+ System.out.println("REDACTED MESSAGE: " + message);
+ JsonNode eventJson = objectMapper.readTree(message);
+
+ // Verify that the entire configResponse field is redacted (omitted from JSON)
+ assertThat(eventJson.has("config_response"))
+ .as("config_response field should be redacted")
+ .isFalse();
+
+ // Verify that the original values are NOT present
+ assertThat(message).doesNotContain("AKIAIOSFODNN7EXAMPLE");
+ assertThat(message).doesNotContain("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+ assertThat(message).doesNotContain("some-bearer-token");
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldNotRedactInNoneMode() throws Exception {
+ // Configure for NONE redaction mode
+ when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE);
+
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+
+ try {
+ // Create an event with data that would normally be redacted
+ org.apache.iceberg.rest.responses.ConfigResponse configResponse =
+ org.apache.iceberg.rest.responses.ConfigResponse.builder()
+ .withDefault("test-key", "test-value")
+ .withOverride("prefix", "test-catalog")
+ .build();
+
+ listener.onAfterGetConfig(
+ new IcebergRestCatalogEvents.AfterGetConfigEvent(configResponse));
+
+ // Wait for event to be sent
+ Awaitility.await("event should be sent to CloudWatch")
+ .atMost(Duration.ofSeconds(30))
+ .pollDelay(Duration.ofMillis(100))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(
+ () -> {
+ GetLogEventsResponse resp =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ assertThat(resp.events().size()).isGreaterThan(0);
+ });
+
+ // Retrieve and verify the logged event
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+ String message = logEvents.events().get(0).message();
+ System.out.println("NON-REDACTED MESSAGE: " + message);
+ JsonNode eventJson = objectMapper.readTree(message);
+
+ // Verify that values are NOT redacted (config_response field is present)
+ assertThat(eventJson.has("config_response"))
+ .as("config_response field should be present")
+ .isTrue();
+ assertThat(message).contains("test-key");
+ assertThat(message).contains("test-value");
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldRedactAdditionalFieldsWhenConfigured() throws Exception {
+ // Configure additional fields to redact
+ when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE);
+ when(config.additionalRedactedFields())
+ .thenReturn(java.util.Optional.of(Set.of("catalogName", "namespace")));
+
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+
+ try {
+ // Create and send a test event
+ Namespace namespaceTest = Namespace.of("sensitive_namespace");
+ TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table");
+ listener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("sensitive_catalog", testTable));
+
+ // Wait for the event to be sent
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofMillis(100))
+ .until(
+ () -> {
+ GetLogEventsResponse response =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ return !response.events().isEmpty();
+ });
+
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+ String message = logEvents.events().get(0).message();
+ System.out.println("FIELD-REDACTED MESSAGE: " + message);
+ JsonNode eventJson = objectMapper.readTree(message);
+
+ // Verify that configured fields are redacted
+ assertThat(eventJson.get("catalog_name").asText())
+ .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker());
+ assertThat(eventJson.get("namespace").asText())
+ .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker());
+
+ // Verify that other fields are NOT redacted
+ assertThat(eventJson.has("table_identifier")).isTrue();
+ assertThat(message).doesNotContain("sensitive_catalog");
+ assertThat(message).doesNotContain("sensitive_namespace");
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldRedactFieldsWithWildcardPatterns() throws Exception {
+ // Configure wildcard patterns to redact
+ when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE);
+ when(config.additionalRedactedFields())
+ .thenReturn(java.util.Optional.of(Set.of("*Name", "table*")));
+
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+
+ try {
+ // Create and send a test event
+ Namespace namespaceTest = Namespace.of("test_namespace");
+ TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table");
+ listener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable));
+
+ // Wait for the event to be sent
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofMillis(100))
+ .until(
+ () -> {
+ GetLogEventsResponse response =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ return !response.events().isEmpty();
+ });
+
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+ String message = logEvents.events().get(0).message();
+ System.out.println("WILDCARD-REDACTED MESSAGE: " + message);
+ JsonNode eventJson = objectMapper.readTree(message);
+
+ // Verify that fields matching wildcard patterns are redacted
+ // *Name should match catalogName, tableName
+ assertThat(eventJson.get("catalog_name").asText())
+ .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker());
+
+ // table* should match tableIdentifier
+ assertThat(eventJson.get("table_identifier").asText())
+ .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker());
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+
+ @Test
+ void shouldCombineRedactionModeWithAdditionalFields() throws Exception {
+ // Configure PARTIAL mode with additional fields
+ when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.PARTIAL);
+ when(config.additionalRedactedFields())
+ .thenReturn(java.util.Optional.of(Set.of("catalogName")));
+
+ CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient();
+ AwsCloudWatchEventListener listener = createListener(client);
+ listener.start();
+
+ try {
+ // Create and send a test event
+ Namespace namespaceTest = Namespace.of("test_namespace");
+ TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table");
+ listener.onAfterRefreshTable(
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable));
+
+ // Wait for the event to be sent
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofMillis(100))
+ .until(
+ () -> {
+ GetLogEventsResponse response =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+ return !response.events().isEmpty();
+ });
+
+ GetLogEventsResponse logEvents =
+ client
+ .getLogEvents(
+ GetLogEventsRequest.builder()
+ .logGroupName(LOG_GROUP)
+ .logStreamName(LOG_STREAM)
+ .build())
+ .join();
+
+ assertThat(logEvents.events()).hasSize(1);
+ String message = logEvents.events().get(0).message();
+ System.out.println("COMBINED-REDACTED MESSAGE: " + message);
+ JsonNode eventJson = objectMapper.readTree(message);
+
+ // Verify that catalogName is redacted (from additional fields)
+ assertThat(eventJson.get("catalog_name").asText())
+ .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker());
+
+ // Verify that default PARTIAL mode redactions are also applied
+ // (This would be tested with events that have properties/config fields)
+ } finally {
+ client.close();
+ listener.shutdown();
+ }
+ }
+}
diff --git a/runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version b/runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version
similarity index 100%
rename from runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version
rename to runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version