Skip to content

Commit

Permalink
Add JsonMetadataDomain & RowTrackingMetadataDomain
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Nov 25, 2024
1 parent 700bdaf commit 0ec7a0c
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metadatadomain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain;
import java.util.Optional;

/**
* Abstract class representing a JSON metadata domain, whose configuration string is a JSON
* serialization of a domain object. This class provides methods to serialize and deserialize a
* metadata domain to and from JSON. Concrete implementations, such as {@link
* RowTrackingMetadataDomain}, should extend this class to define a specific metadata domain.
*/
public abstract class JsonMetadataDomain {
// We explicitly set the ObjectMapper to fail on missing properties and unknown properties
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);

/**
* Deserializes a JSON string into an instance of the specified metadata domain.
*
* @param json the JSON string to deserialize
* @param clazz the concrete class of the metadata domain object to deserialize into
* @param <T> the type of the object
* @return the deserialized object
* @throws KernelException if the JSON string cannot be parsed
*/
protected static <T> T fromJsonConfiguration(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
throw new KernelException(
String.format(
"Failed to parse JSON string into a %s instance. JSON content: %s",
clazz.getSimpleName(), json),
e);
}
}

/**
* Retrieves the domain metadata from a snapshot for a given domain, and deserializes it into an
* instance of the specified metadata domain class.
*
* @param snapshot the snapshot to read from
* @param clazz the metadata domain class of the object to deserialize into
* @param domainName the name of the domain
* @param <T> the type of the metadata domain object
* @return an Optional containing the deserialized object if the domain metadata is found,
* otherwise an empty Optional
*/
protected static <T> Optional<T> fromSnapshot(
SnapshotImpl snapshot, Class<T> clazz, String domainName) {
return Optional.ofNullable(snapshot.getDomainMetadataMap().get(domainName))
.map(domainMetadata -> fromJsonConfiguration(domainMetadata.getConfiguration(), clazz));
}

/**
* Returns the name of the domain.
*
* @return the domain name
*/
@JsonIgnore
public abstract String getDomainName();

/**
* Serializes this object into a JSON string.
*
* @return the JSON string representation of this object
* @throws KernelException if the object cannot be serialized
*/
public String toJsonConfiguration() {
try {
return OBJECT_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new KernelException(
String.format(
"Could not serialize %s (domain: %s) to JSON",
this.getClass().getSimpleName(), getDomainName()),
e);
}
}

/**
* Generate a {@link DomainMetadata} action from this metadata domain.
*
* @return the DomainMetadata action instance
*/
public DomainMetadata toDomainMetadata() {
return new DomainMetadata(getDomainName(), toJsonConfiguration(), false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.rowtracking;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.metadatadomain.JsonMetadataDomain;
import java.util.Optional;

/** Represents the metadata domain for row tracking. */
public class RowTrackingMetadataDomain extends JsonMetadataDomain {

public static final String DOMAIN_NAME = "delta.rowTracking";

/** The highest assigned fresh row id for the table */
private long rowIdHighWaterMark;

/**
* Constructs a RowTrackingMetadataDomain with the specified row ID high water mark.
*
* @param rowIdHighWaterMark the row ID high water mark
*/
@JsonCreator
public RowTrackingMetadataDomain(@JsonProperty("rowIdHighWaterMark") long rowIdHighWaterMark) {
this.rowIdHighWaterMark = rowIdHighWaterMark;
}

@Override
public String getDomainName() {
return DOMAIN_NAME;
}

public long getRowIdHighWaterMark() {
return rowIdHighWaterMark;
}

public void setRowIdHighWaterMark(long rowIdHighWaterMark) {
this.rowIdHighWaterMark = rowIdHighWaterMark;
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
RowTrackingMetadataDomain that = (RowTrackingMetadataDomain) obj;
return rowIdHighWaterMark == that.rowIdHighWaterMark;
}

@Override
public int hashCode() {
return java.util.Objects.hash(DOMAIN_NAME, rowIdHighWaterMark);
}

@Override
public String toString() {
return "RowTrackingMetadataDomain{" + "rowIdHighWaterMark=" + rowIdHighWaterMark + '}';
}

/**
* Creates an instance of {@link RowTrackingMetadataDomain} from a JSON configuration string.
*
* @param json the JSON configuration string
* @return an instance of {@link RowTrackingMetadataDomain}
*/
public static RowTrackingMetadataDomain fromJsonConfiguration(String json) {
return JsonMetadataDomain.fromJsonConfiguration(json, RowTrackingMetadataDomain.class);
}

/**
* Creates an instance of {@link RowTrackingMetadataDomain} from a {@link SnapshotImpl}.
*
* @param snapshot the snapshot instance
* @return an {@link Optional} containing the {@link RowTrackingMetadataDomain} if present
*/
public static Optional<RowTrackingMetadataDomain> fromSnapshot(SnapshotImpl snapshot) {
return JsonMetadataDomain.fromSnapshot(snapshot, RowTrackingMetadataDomain.class, DOMAIN_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl
import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction}
import io.delta.kernel.internal.util.Utils.toCloseableIterator
import io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL
import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.DeltaLog
Expand Down Expand Up @@ -540,4 +541,73 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
}
}
}

test(
"RowTrackingMetadataDomain is serializable and deserializable"
) {
withTempDirAndEngine((tablePath, engine) => {
// Create a RowTrackingMetadataDomain
val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10)

// Generate a DomainMetadata action from it.
val dm = rowTrackingMetadataDomain.toDomainMetadata
// The configuration string should be a JSON serialization of the rowTrackingMetadataDomain
assert(dm.getDomain === rowTrackingMetadataDomain.getDomainName)
assert(dm.getConfiguration === """{"rowIdHighWaterMark":10}""")

// Verify the deserialization from DomainMetadata action into concrete domain object
val deserializedDomain = RowTrackingMetadataDomain.fromJsonConfiguration(dm.getConfiguration)
assert(deserializedDomain === rowTrackingMetadataDomain)

// Verify the domainMetadata can be committed and read back
createTableWithDomainMetadataSupported(engine, tablePath)
// Commit the domain metadata and verify
commitDomainMetadataAndVerify(
engine,
tablePath,
domainMetadatas = Seq(dm),
expectedValue = Map(rowTrackingMetadataDomain.getDomainName -> dm)
)

// Read the domain metadata back from the table snapshot
val table = Table.forPath(engine, tablePath)
val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
val rowTrackingMetadataDomainFromSnapshot =
RowTrackingMetadataDomain.fromSnapshot(snapshot).get

// Verify the domain metadata read back from the snapshot
assert(rowTrackingMetadataDomain === rowTrackingMetadataDomainFromSnapshot)
})
}

test(
"RowTrackingMetadataDomain deserialization fails if there are missing/extra fields"
) {
// Create a RowTrackingMetadataDomain and serialize it to a JSON configuration
val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10)
val config = rowTrackingMetadataDomain.toJsonConfiguration
assert(config === """{"rowIdHighWaterMark":10}""")

// Deserialization should fail if the configuration string has extra fields
val configExtraField = """{"rowIdHighWaterMark":10, "extraField": "extra"}"""
val e1 = intercept[KernelException] {
RowTrackingMetadataDomain.fromJsonConfiguration(configExtraField)
}
assert(
e1.getMessage.contains(
"Failed to parse JSON string into a RowTrackingMetadataDomain instance"
)
)

// Deserialization should fail if the configuration string has missing fields
val configMissingField = """{}"""
val e2 = intercept[KernelException] {
RowTrackingMetadataDomain.fromJsonConfiguration(configMissingField)
}
assert(
e2.getMessage.contains(
"Failed to parse JSON string into a RowTrackingMetadataDomain instance"
)
)
}
}

0 comments on commit 0ec7a0c

Please sign in to comment.