diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java new file mode 100644 index 0000000000..86ca4733bf --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java @@ -0,0 +1,112 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metadatadomain; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.*; +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.DomainMetadata; +import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain; +import java.util.Optional; + +/** + * Abstract class representing a JSON metadata domain, whose configuration string is a JSON + * serialization of a domain object. This class provides methods to serialize and deserialize a + * metadata domain to and from JSON. Concrete implementations, such as {@link + * RowTrackingMetadataDomain}, should extend this class to define a specific metadata domain. + */ +public abstract class JsonMetadataDomain { + // We explicitly set the ObjectMapper to fail on missing properties and unknown properties + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + + /** + * Deserializes a JSON string into an instance of the specified metadata domain. + * + * @param json the JSON string to deserialize + * @param clazz the concrete class of the metadata domain object to deserialize into + * @param the type of the object + * @return the deserialized object + * @throws KernelException if the JSON string cannot be parsed + */ + protected static T fromJsonConfiguration(String json, Class clazz) { + try { + return OBJECT_MAPPER.readValue(json, clazz); + } catch (JsonProcessingException e) { + throw new KernelException( + String.format( + "Failed to parse JSON string into a %s instance. JSON content: %s", + clazz.getSimpleName(), json), + e); + } + } + + /** + * Retrieves the domain metadata from a snapshot for a given domain, and deserializes it into an + * instance of the specified metadata domain class. + * + * @param snapshot the snapshot to read from + * @param clazz the metadata domain class of the object to deserialize into + * @param domainName the name of the domain + * @param the type of the metadata domain object + * @return an Optional containing the deserialized object if the domain metadata is found, + * otherwise an empty Optional + */ + protected static Optional fromSnapshot( + SnapshotImpl snapshot, Class clazz, String domainName) { + return Optional.ofNullable(snapshot.getDomainMetadataMap().get(domainName)) + .map(domainMetadata -> fromJsonConfiguration(domainMetadata.getConfiguration(), clazz)); + } + + /** + * Returns the name of the domain. + * + * @return the domain name + */ + @JsonIgnore + public abstract String getDomainName(); + + /** + * Serializes this object into a JSON string. + * + * @return the JSON string representation of this object + * @throws KernelException if the object cannot be serialized + */ + public String toJsonConfiguration() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new KernelException( + String.format( + "Could not serialize %s (domain: %s) to JSON", + this.getClass().getSimpleName(), getDomainName()), + e); + } + } + + /** + * Generate a {@link DomainMetadata} action from this metadata domain. + * + * @return the DomainMetadata action instance + */ + public DomainMetadata toDomainMetadata() { + return new DomainMetadata(getDomainName(), toJsonConfiguration(), false); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java new file mode 100644 index 0000000000..5cfeb61b4c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java @@ -0,0 +1,92 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.rowtracking; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.metadatadomain.JsonMetadataDomain; +import java.util.Optional; + +/** Represents the metadata domain for row tracking. */ +public class RowTrackingMetadataDomain extends JsonMetadataDomain { + + public static final String DOMAIN_NAME = "delta.rowTracking"; + + /** The highest assigned fresh row id for the table */ + private long rowIdHighWaterMark; + + /** + * Constructs a RowTrackingMetadataDomain with the specified row ID high water mark. + * + * @param rowIdHighWaterMark the row ID high water mark + */ + @JsonCreator + public RowTrackingMetadataDomain(@JsonProperty("rowIdHighWaterMark") long rowIdHighWaterMark) { + this.rowIdHighWaterMark = rowIdHighWaterMark; + } + + @Override + public String getDomainName() { + return DOMAIN_NAME; + } + + public long getRowIdHighWaterMark() { + return rowIdHighWaterMark; + } + + public void setRowIdHighWaterMark(long rowIdHighWaterMark) { + this.rowIdHighWaterMark = rowIdHighWaterMark; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + RowTrackingMetadataDomain that = (RowTrackingMetadataDomain) obj; + return rowIdHighWaterMark == that.rowIdHighWaterMark; + } + + @Override + public int hashCode() { + return java.util.Objects.hash(DOMAIN_NAME, rowIdHighWaterMark); + } + + @Override + public String toString() { + return "RowTrackingMetadataDomain{" + "rowIdHighWaterMark=" + rowIdHighWaterMark + '}'; + } + + /** + * Creates an instance of {@link RowTrackingMetadataDomain} from a JSON configuration string. + * + * @param json the JSON configuration string + * @return an instance of {@link RowTrackingMetadataDomain} + */ + public static RowTrackingMetadataDomain fromJsonConfiguration(String json) { + return JsonMetadataDomain.fromJsonConfiguration(json, RowTrackingMetadataDomain.class); + } + + /** + * Creates an instance of {@link RowTrackingMetadataDomain} from a {@link SnapshotImpl}. + * + * @param snapshot the snapshot instance + * @return an {@link Optional} containing the {@link RowTrackingMetadataDomain} if present + */ + public static Optional fromSnapshot(SnapshotImpl snapshot) { + return JsonMetadataDomain.fromSnapshot(snapshot, RowTrackingMetadataDomain.class, DOMAIN_NAME); + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala index ff097c7c5d..f9db4a5183 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala @@ -24,6 +24,7 @@ import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction} import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL +import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} import org.apache.hadoop.fs.Path import org.apache.spark.sql.delta.DeltaLog @@ -540,4 +541,73 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase } } } + + test( + "RowTrackingMetadataDomain is serializable and deserializable" + ) { + withTempDirAndEngine((tablePath, engine) => { + // Create a RowTrackingMetadataDomain + val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10) + + // Generate a DomainMetadata action from it. + val dm = rowTrackingMetadataDomain.toDomainMetadata + // The configuration string should be a JSON serialization of the rowTrackingMetadataDomain + assert(dm.getDomain === rowTrackingMetadataDomain.getDomainName) + assert(dm.getConfiguration === """{"rowIdHighWaterMark":10}""") + + // Verify the deserialization from DomainMetadata action into concrete domain object + val deserializedDomain = RowTrackingMetadataDomain.fromJsonConfiguration(dm.getConfiguration) + assert(deserializedDomain === rowTrackingMetadataDomain) + + // Verify the domainMetadata can be committed and read back + createTableWithDomainMetadataSupported(engine, tablePath) + // Commit the domain metadata and verify + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm), + expectedValue = Map(rowTrackingMetadataDomain.getDomainName -> dm) + ) + + // Read the domain metadata back from the table snapshot + val table = Table.forPath(engine, tablePath) + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val rowTrackingMetadataDomainFromSnapshot = + RowTrackingMetadataDomain.fromSnapshot(snapshot).get + + // Verify the domain metadata read back from the snapshot + assert(rowTrackingMetadataDomain === rowTrackingMetadataDomainFromSnapshot) + }) + } + + test( + "RowTrackingMetadataDomain deserialization fails if there are missing/extra fields" + ) { + // Create a RowTrackingMetadataDomain and serialize it to a JSON configuration + val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10) + val config = rowTrackingMetadataDomain.toJsonConfiguration + assert(config === """{"rowIdHighWaterMark":10}""") + + // Deserialization should fail if the configuration string has extra fields + val configExtraField = """{"rowIdHighWaterMark":10, "extraField": "extra"}""" + val e1 = intercept[KernelException] { + RowTrackingMetadataDomain.fromJsonConfiguration(configExtraField) + } + assert( + e1.getMessage.contains( + "Failed to parse JSON string into a RowTrackingMetadataDomain instance" + ) + ) + + // Deserialization should fail if the configuration string has missing fields + val configMissingField = """{}""" + val e2 = intercept[KernelException] { + RowTrackingMetadataDomain.fromJsonConfiguration(configMissingField) + } + assert( + e2.getMessage.contains( + "Failed to parse JSON string into a RowTrackingMetadataDomain instance" + ) + ) + } }