diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index aa0306edf55b..fd4ea08036c4 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -102,6 +102,24 @@ public static void assertSameSchemaList(List list1, List list2) ); } + public static void assertSerializedMetadata(Table expected, Table actual) { + Assert.assertEquals("Name must match", expected.name(), actual.name()); + Assert.assertEquals("Location must match", expected.location(), actual.location()); + Assert.assertEquals("Props must match", expected.properties(), actual.properties()); + Assert.assertEquals("Schema must match", expected.schema().asStruct(), actual.schema().asStruct()); + Assert.assertEquals("Spec must match", expected.spec(), actual.spec()); + Assert.assertEquals("Sort order must match", expected.sortOrder(), actual.sortOrder()); + } + + public static void assertSerializedAndLoadedMetadata(Table expected, Table actual) { + assertSerializedMetadata(expected, actual); + Assert.assertEquals("Specs must match", expected.specs(), actual.specs()); + Assert.assertEquals("Sort orders must match", expected.sortOrders(), actual.sortOrders()); + Assert.assertEquals("Current snapshot must match", expected.currentSnapshot(), actual.currentSnapshot()); + Assert.assertEquals("Snapshots must match", expected.snapshots(), actual.snapshots()); + Assert.assertEquals("History must match", expected.history(), actual.history()); + } + private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor { private final String message; diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 18006369a3c5..28a5a59089da 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -219,37 +219,6 @@ public String toString() { } final Object writeReplace() { - String metadataLocation = ops.current().metadataFileLocation(); - return new TableProxy(io(), table().name(), name(), metadataLocation, metadataTableType(), locationProvider()); - } - - static class TableProxy implements Serializable { - private FileIO io; - private String baseTableName; - private String metadataTableName; - private String metadataLocation; - private MetadataTableType type; - private LocationProvider locationProvider; - - TableProxy(FileIO io, String baseTableName, String metadataTableName, String metadataLocation, - MetadataTableType type, LocationProvider locationProvider) { - this.io = io; - this.baseTableName = baseTableName; - this.metadataTableName = metadataTableName; - this.metadataLocation = metadataLocation; - this.type = type; - this.locationProvider = locationProvider; - } - - /** - * Returns a table with {@link StaticTableOperations} so after deserialization no Catalog related calls are - * needed for accessing the table snapshot data. - * @return The metadata Table object for reading the table data at the time of the serialization of the original - * object - */ - private Object readResolve() { - TableOperations ops = new StaticTableOperations(metadataLocation, io, locationProvider); - return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, metadataTableName, type); - } + return SerializableTable.copyOf(this); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 51c056f5e1dd..f7e7540a365a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -225,30 +225,6 @@ public String toString() { } Object writeReplace() { - return new TableProxy(this); - } - - private static class TableProxy implements Serializable { - private FileIO io; - private String name; - private String metadataLocation; - private LocationProvider locationProvider; - - private TableProxy(BaseTable table) { - io = table.io(); - name = table.name(); - metadataLocation = table.operations().current().metadataFileLocation(); - locationProvider = table.locationProvider(); - } - - /** - * Returns a BaseTable with {@link StaticTableOperations} so after deserialization no Catalog related calls are - * needed for accessing the table snapshot data. - * @return The BaseTable object for reading the table data at the time of the serialization of the original - * BaseTable object - */ - private Object readResolve() { - return new BaseTable(new StaticTableOperations(metadataLocation, io, locationProvider), name); - } + return SerializableTable.copyOf(this); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 925f37f89955..36cf53bd0dbd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Set; @@ -499,7 +500,7 @@ public long newSnapshotId() { } } - public class TransactionTable implements Table, HasTableOperations { + public class TransactionTable implements Table, HasTableOperations, Serializable { @Override public TableOperations operations() { @@ -679,6 +680,10 @@ public LocationProvider locationProvider() { public String toString() { return name(); } + + Object writeReplace() { + return SerializableTable.copyOf(this); + } } @VisibleForTesting diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java new file mode 100644 index 000000000000..5b649f6eb1fa --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SerializableMap; + +/** + * A read-only serializable table that can be sent to other nodes in a cluster. + *

+ * An instance of this class represents an immutable serializable copy of a table state and + * will not reflect any subsequent changed made to the original table. + *

+ * While this class captures the metadata file location that can be used to load the complete + * table metadata, it directly persists the current schema, spec, sort order, table properties + * to avoid reading the metadata file from other nodes for frequently needed metadata. + *

+ * The implementation assumes the passed instances of {@link FileIO}, {@link EncryptionManager}, + * {@link LocationProvider} are serializable. If you are serializing the table using a custom + * serialization framework like Kryo, those instances of {@link FileIO}, {@link EncryptionManager}, + * {@link LocationProvider} must be supported by that particular serialization framework. + *

+ * Note: loading the complete metadata from a large number of nodes can overwhelm the storage. + */ +public class SerializableTable implements Table, Serializable { + + private final String name; + private final String location; + private final String metadataFileLocation; + private final Map properties; + private final String schemaAsJson; + private final String specAsJson; + private final String sortOrderAsJson; + private final FileIO io; + private final EncryptionManager encryption; + private final LocationProvider locationProvider; + + private transient volatile Table lazyTable = null; + private transient volatile Schema lazySchema = null; + private transient volatile PartitionSpec lazySpec = null; + private transient volatile SortOrder lazySortOrder = null; + + private SerializableTable(Table table) { + this.name = table.name(); + this.location = table.location(); + this.metadataFileLocation = metadataFileLocation(table); + this.properties = SerializableMap.copyOf(table.properties()); + this.schemaAsJson = SchemaParser.toJson(table.schema()); + this.specAsJson = PartitionSpecParser.toJson(table.spec()); + this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder()); + this.io = fileIO(table); + this.encryption = table.encryption(); + this.locationProvider = table.locationProvider(); + } + + /** + * Creates a read-only serializable table that can be sent to other nodes in a cluster. + * + * @param table the original table to copy the state from + * @return a read-only serializable table reflecting the current state of the original table + */ + public static Table copyOf(Table table) { + if (table instanceof BaseMetadataTable) { + return new SerializableMetadataTable((BaseMetadataTable) table); + } else { + return new SerializableTable(table); + } + } + + private String metadataFileLocation(Table table) { + if (table instanceof HasTableOperations) { + TableOperations ops = ((HasTableOperations) table).operations(); + return ops.current().metadataFileLocation(); + } else { + return null; + } + } + + private FileIO fileIO(Table table) { + if (table.io() instanceof HadoopFileIO) { + HadoopFileIO hadoopFileIO = (HadoopFileIO) table.io(); + SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopFileIO.getConf()); + return new HadoopFileIO(serializedConf::get); + } else { + return table.io(); + } + } + + private Table lazyTable() { + if (lazyTable == null) { + synchronized (this) { + if (lazyTable == null) { + if (metadataFileLocation == null) { + throw new UnsupportedOperationException("Cannot load metadata: metadata file location is null"); + } + + TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider); + this.lazyTable = newTable(ops, name); + } + } + } + + return lazyTable; + } + + protected Table newTable(TableOperations ops, String tableName) { + return new BaseTable(ops, tableName); + } + + @Override + public String name() { + return name; + } + + @Override + public String location() { + return location; + } + + @Override + public Map properties() { + return properties; + } + + @Override + public Schema schema() { + if (lazySchema == null) { + synchronized (this) { + if (lazySchema == null && lazyTable == null) { + // prefer parsing JSON as opposed to loading the metadata + this.lazySchema = SchemaParser.fromJson(schemaAsJson); + } else if (lazySchema == null) { + this.lazySchema = lazyTable.schema(); + } + } + } + + return lazySchema; + } + + @Override + public PartitionSpec spec() { + if (lazySpec == null) { + synchronized (this) { + if (lazySpec == null && lazyTable == null) { + // prefer parsing JSON as opposed to loading the metadata + this.lazySpec = PartitionSpecParser.fromJson(schema(), specAsJson); + } else if (lazySpec == null) { + this.lazySpec = lazyTable.spec(); + } + } + } + + return lazySpec; + } + + @Override + public Map specs() { + return lazyTable().specs(); + } + + @Override + public SortOrder sortOrder() { + if (lazySortOrder == null) { + synchronized (this) { + if (lazySortOrder == null && lazyTable == null) { + // prefer parsing JSON as opposed to loading the metadata + this.lazySortOrder = SortOrderParser.fromJson(schema(), sortOrderAsJson); + } else if (lazySortOrder == null) { + this.lazySortOrder = lazyTable.sortOrder(); + } + } + } + + return lazySortOrder; + } + + @Override + public Map sortOrders() { + return lazyTable().sortOrders(); + } + + @Override + public FileIO io() { + return io; + } + + @Override + public EncryptionManager encryption() { + return encryption; + } + + @Override + public LocationProvider locationProvider() { + return locationProvider; + } + + @Override + public void refresh() { + throw new UnsupportedOperationException(errorMsg("refresh")); + } + + @Override + public TableScan newScan() { + return lazyTable().newScan(); + } + + @Override + public Snapshot currentSnapshot() { + return lazyTable().currentSnapshot(); + } + + @Override + public Snapshot snapshot(long snapshotId) { + return lazyTable().snapshot(snapshotId); + } + + @Override + public Iterable snapshots() { + return lazyTable().snapshots(); + } + + @Override + public List history() { + return lazyTable().history(); + } + + @Override + public UpdateSchema updateSchema() { + throw new UnsupportedOperationException(errorMsg("updateSchema")); + } + + @Override + public UpdatePartitionSpec updateSpec() { + throw new UnsupportedOperationException(errorMsg("updateSpec")); + } + + @Override + public UpdateProperties updateProperties() { + throw new UnsupportedOperationException(errorMsg("updateProperties")); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + throw new UnsupportedOperationException(errorMsg("replaceSortOrder")); + } + + @Override + public UpdateLocation updateLocation() { + throw new UnsupportedOperationException(errorMsg("updateLocation")); + } + + @Override + public AppendFiles newAppend() { + throw new UnsupportedOperationException(errorMsg("newAppend")); + } + + @Override + public RewriteFiles newRewrite() { + throw new UnsupportedOperationException(errorMsg("newRewrite")); + } + + @Override + public RewriteManifests rewriteManifests() { + throw new UnsupportedOperationException(errorMsg("rewriteManifests")); + } + + @Override + public OverwriteFiles newOverwrite() { + throw new UnsupportedOperationException(errorMsg("newOverwrite")); + } + + @Override + public RowDelta newRowDelta() { + throw new UnsupportedOperationException(errorMsg("newRowDelta")); + } + + @Override + public ReplacePartitions newReplacePartitions() { + throw new UnsupportedOperationException(errorMsg("newReplacePartitions")); + } + + @Override + public DeleteFiles newDelete() { + throw new UnsupportedOperationException(errorMsg("newDelete")); + } + + @Override + public ExpireSnapshots expireSnapshots() { + throw new UnsupportedOperationException(errorMsg("expireSnapshots")); + } + + @Override + public Rollback rollback() { + throw new UnsupportedOperationException(errorMsg("rollback")); + } + + @Override + public ManageSnapshots manageSnapshots() { + throw new UnsupportedOperationException(errorMsg("manageSnapshots")); + } + + @Override + public Transaction newTransaction() { + throw new UnsupportedOperationException(errorMsg("newTransaction")); + } + + private String errorMsg(String operation) { + return String.format("Operation %s is not supported after the table is serialized", operation); + } + + private static class SerializableMetadataTable extends SerializableTable { + private final MetadataTableType type; + private final String baseTableName; + + SerializableMetadataTable(BaseMetadataTable metadataTable) { + super(metadataTable); + this.type = metadataTable.metadataTableType(); + this.baseTableName = metadataTable.table().name(); + } + + @Override + protected Table newTable(TableOperations ops, String tableName) { + return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type); + } + } + + // captures the current state of a Hadoop configuration in a serializable manner + private static class SerializableConfiguration implements Serializable { + + private final Map confAsMap; + private transient volatile Configuration conf = null; + + SerializableConfiguration(Configuration conf) { + this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size()); + conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue())); + } + + public Configuration get() { + if (conf == null) { + synchronized (this) { + if (conf == null) { + Configuration newConf = new Configuration(false); + confAsMap.forEach(newConf::set); + this.conf = newConf; + } + } + } + + return conf; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java b/core/src/main/java/org/apache/iceberg/SortOrderParser.java index 418486e1b879..a351e3a53a81 100644 --- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java +++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.io.StringWriter; import java.io.UncheckedIOException; import java.util.Iterator; import java.util.Locale; @@ -50,6 +51,26 @@ public static void toJson(SortOrder sortOrder, JsonGenerator generator) throws I generator.writeEndObject(); } + public static String toJson(SortOrder sortOrder) { + return toJson(sortOrder, false); + } + + public static String toJson(SortOrder sortOrder, boolean pretty) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + if (pretty) { + generator.useDefaultPrettyPrinter(); + } + toJson(sortOrder, generator); + generator.flush(); + return writer.toString(); + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static String toJson(SortDirection direction) { return direction.toString().toLowerCase(Locale.ROOT); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java index 58bae17d3a1d..1086ba014455 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java @@ -31,16 +31,70 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.Transaction; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; public class TestTableSerialization extends HadoopTableTestBase { @Test - public void testSerializeBaseTable() throws IOException { + public void testSerializableTable() throws IOException, ClassNotFoundException { + table.replaceSortOrder() + .asc("id") + .commit(); + + table.updateProperties() + .set("k1", "v1") + .set("k2", "v2") + .commit(); + + table.updateSchema() + .addColumn("new_col", Types.IntegerType.get()) + .commit(); + + TestHelpers.assertSerializedAndLoadedMetadata(table, TestHelpers.roundTripSerialize(table)); + } + + @Test + public void testSerializableTxnTable() throws IOException, ClassNotFoundException { + table.replaceSortOrder() + .asc("id") + .commit(); + + table.updateProperties() + .set("k1", "v1") + .set("k2", "v2") + .commit(); + + table.updateSchema() + .addColumn("new_col", Types.IntegerType.get()) + .commit(); + + Transaction txn = table.newTransaction(); + + txn.updateProperties() + .set("k3", "v3") + .commit(); + + // txn tables have metadata locations as null so we check only serialized metadata + TestHelpers.assertSerializedMetadata(txn.table(), TestHelpers.roundTripSerialize(txn.table())); + } + + @Test + public void testSerializableMetadataTable() throws IOException, ClassNotFoundException { + for (MetadataTableType type : MetadataTableType.values()) { + Table metadataTable = getMetaDataTable(table, type); + TestHelpers.assertSerializedAndLoadedMetadata(metadataTable, TestHelpers.roundTripSerialize(metadataTable)); + } + } + + @Test + public void testSerializableTablePlanning() throws IOException { table.newAppend() .appendFile(FILE_A) .commit(); @@ -65,7 +119,7 @@ public void testSerializeBaseTable() throws IOException { } @Test - public void testMetadataTables() throws IOException { + public void testSerializableMetadataTablesPlanning() throws IOException { table.newAppend() .appendFile(FILE_A) .commit(); diff --git a/spark/src/test/java/org/apache/iceberg/KryoHelpers.java b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java new file mode 100644 index 000000000000..ee0f0a73959a --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/KryoHelpers.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; + +public class KryoHelpers { + + private KryoHelpers() { + } + + @SuppressWarnings("unchecked") + public static T roundTripSerialize(T obj) throws IOException { + Kryo kryo = new KryoSerializer(new SparkConf()).newKryo(); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + try (Output out = new Output(new ObjectOutputStream(bytes))) { + kryo.writeClassAndObject(out, obj); + } + + try (Input in = new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) { + return (T) kryo.readClassAndObject(in); + } + } +} diff --git a/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java new file mode 100644 index 000000000000..7c14485ff9a0 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestFileIOSerialization { + + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); + + private static final PartitionSpec SPEC = PartitionSpec + .builderFor(SCHEMA) + .identity("date") + .build(); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA) + .asc("id") + .build(); + + static { + CONF.set("k1", "v1"); + CONF.set("k2", "v2"); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private Table table; + + @Before + public void initTable() throws IOException { + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + + File tableLocation = temp.newFolder(); + Assert.assertTrue(tableLocation.delete()); + + this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); + } + + @Test + public void testHadoopFileIOKryoSerialization() throws IOException { + FileIO io = table.io(); + Configuration expectedConf = ((HadoopFileIO) io).conf(); + + Table serializableTable = SerializableTable.copyOf(table); + FileIO deserializedIO = KryoHelpers.roundTripSerialize(serializableTable.io()); + Configuration actualConf = ((HadoopFileIO) deserializedIO).conf(); + + Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf)); + Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1")); + Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2")); + } + + @Test + public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException { + FileIO io = table.io(); + Configuration expectedConf = ((HadoopFileIO) io).conf(); + + Table serializableTable = SerializableTable.copyOf(table); + FileIO deserializedIO = TestHelpers.roundTripSerialize(serializableTable.io()); + Configuration actualConf = ((HadoopFileIO) deserializedIO).conf(); + + Assert.assertEquals("Conf pairs must match", toMap(expectedConf), toMap(actualConf)); + Assert.assertEquals("Conf values must be present", "v1", actualConf.get("k1")); + Assert.assertEquals("Conf values must be present", "v2", actualConf.get("k2")); + } + + private Map toMap(Configuration conf) { + Map map = Maps.newHashMapWithExpectedSize(conf.size()); + conf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + return map; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java new file mode 100644 index 000000000000..88e30c0ece68 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestTableSerialization { + + private static final HadoopTables TABLES = new HadoopTables(); + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); + + private static final PartitionSpec SPEC = PartitionSpec + .builderFor(SCHEMA) + .identity("date") + .build(); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA) + .asc("id") + .build(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private Table table; + + @Before + public void initTable() throws IOException { + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + + File tableLocation = temp.newFolder(); + Assert.assertTrue(tableLocation.delete()); + + this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString()); + } + + @Test + public void testSerializableTableKryoSerialization() throws IOException { + Table serializableTable = SerializableTable.copyOf(table); + TestHelpers.assertSerializedAndLoadedMetadata(table, KryoHelpers.roundTripSerialize(serializableTable)); + } + + @Test + public void testSerializableMetadataTableKryoSerialization() throws IOException { + for (MetadataTableType type : MetadataTableType.values()) { + TableOperations ops = ((HasTableOperations) table).operations(); + Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type); + Table serializableMetadataTable = SerializableTable.copyOf(metadataTable); + + TestHelpers.assertSerializedAndLoadedMetadata( + metadataTable, + KryoHelpers.roundTripSerialize(serializableMetadataTable)); + } + } + + @Test + public void testSerializableTransactionTableKryoSerialization() throws IOException { + Transaction txn = table.newTransaction(); + + txn.updateProperties() + .set("k1", "v1") + .commit(); + + Table txnTable = txn.table(); + Table serializableTxnTable = SerializableTable.copyOf(txnTable); + + TestHelpers.assertSerializedMetadata(txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable)); + } +}