From 41ed1f70c5b0d76885f087f3fe3d7aa5f92397ae Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Fri, 14 Aug 2020 13:14:34 -0500 Subject: [PATCH 1/2] Static Table Operations Allows for a Table Operations which references a specfic metadata version file. This operation will not change even if the base table it was derived from is changed. This enables it to act like a ReadOnly view of the table's state at a given time. --- .../apache/iceberg/StaticTableOperations.java | 72 +++++++++++++++++ .../apache/iceberg/hadoop/HadoopTables.java | 38 +++++---- .../iceberg/hadoop/TestStaticTable.java | 79 +++++++++++++++++++ 3 files changed, 173 insertions(+), 16 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/StaticTableOperations.java create mode 100644 core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java diff --git a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java new file mode 100644 index 000000000000..09d5571015f0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.iceberg; + +import java.nio.file.Paths; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; + +/** + * Representation of an immutable snapshot of Table State that can be used in + * other Iceberg Functions. + */ +public class StaticTableOperations implements TableOperations { + private final TableMetadata staticMetadata; + private final FileIO io; + + /** + * Creates a StaticTableOperations tied to a specific static version of the TableMetadata + */ + public StaticTableOperations(String location, FileIO io) { + this.io = io; + this.staticMetadata = TableMetadataParser.read(io, location); + } + + @Override + public TableMetadata current() { + return staticMetadata; + } + + @Override + public TableMetadata refresh() { + return staticMetadata; + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("This TableOperations is static, it cannot be modified"); + } + + @Override + public FileIO io() { + return this.io; + } + + @Override + public String metadataFileLocation(String fileName) { + throw new UnsupportedOperationException("New files cannot be created in a Static Table Operations"); + } + + @Override + public LocationProvider locationProvider() { + throw new UnsupportedOperationException("New files cannot be created in a Static Table Operations"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index 79d0df516cd9..18a75778e331 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AllDataFilesTable; import org.apache.iceberg.AllEntriesTable; @@ -36,6 +37,7 @@ import org.apache.iceberg.PartitionsTable; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotsTable; +import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -50,6 +52,7 @@ * to store metadata and manifests. */ public class HadoopTables implements Tables, Configurable { + private static final String METADATA_JSON = "metadata.json"; private Configuration conf; public HadoopTables() { @@ -68,27 +71,26 @@ public HadoopTables(Configuration conf) { */ @Override public Table load(String location) { - TableOperations ops = newTableOps(location); - if (ops.current() == null) { + //Possibly Load a Metadata Table + if (location.contains("#") && !location.endsWith("#")) { // try to resolve a metadata table, which we encode as URI fragments // e.g. hdfs:///warehouse/my_table#snapshots int hashIndex = location.lastIndexOf('#'); - if (hashIndex != -1 && location.length() - 1 != hashIndex) { - // we found char '#', and it is not the last char of location - String baseTable = location.substring(0, hashIndex); - String metaTable = location.substring(hashIndex + 1); - MetadataTableType type = MetadataTableType.from(metaTable); - if (type != null) { - return loadMetadataTable(baseTable, type); - } else { - throw new NoSuchTableException("Table does not exist at location: " + location); - } - } else { - throw new NoSuchTableException("Table does not exist at location: " + location); + String baseTable = location.substring(0, hashIndex); + String metaTable = location.substring(hashIndex + 1); + MetadataTableType type = MetadataTableType.from(metaTable); + if (type != null) { + return loadMetadataTable(baseTable, type); } } - return new BaseTable(ops, location); + //Normal Table Load if we haven't loaded a MetadataTable + TableOperations ops = newTableOps(location); + if (ops.current() != null) { + return new BaseTable(ops, location); + } else { + throw new NoSuchTableException("Table does not exist at location: " + location); + } } private Table loadMetadataTable(String location, MetadataTableType type) { @@ -152,7 +154,11 @@ public Table create(Schema schema, PartitionSpec spec, Map prope } private TableOperations newTableOps(String location) { - return new HadoopTableOperations(new Path(location), conf); + if (location.contains(METADATA_JSON)) { + return new StaticTableOperations(location, new HadoopFileIO(conf)); + } else { + return new HadoopTableOperations(new Path(location), conf); + } } @Override diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java new file mode 100644 index 000000000000..942100f715ec --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java @@ -0,0 +1,79 @@ +package org.apache.iceberg.hadoop; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.Table; +import org.junit.Assert; +import org.junit.Test; + +public class TestStaticTable extends HadoopTableTestBase { + + private Table getStaticTable() { + return TABLES.load(((HasTableOperations) table).operations().current().metadataFileLocation()); + } + + private Table getStaticTable(MetadataTableType type) { + return TABLES.load(((HasTableOperations) table).operations().current().metadataFileLocation() + "#" + type); + } + + @Test + public void testLoadFromMetadata() { + Table staticTable = getStaticTable(); + Assert.assertTrue("Loading a metadata file based table should return StaticTableOperations", + ((HasTableOperations) staticTable).operations() instanceof StaticTableOperations); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCannotBeAddedTo(){ + Table staticTable = getStaticTable(); + staticTable.newOverwrite().addFile(FILE_A).commit(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCannotBeDeletedFrom(){ + table.newAppend().appendFile(FILE_A).commit(); + Table staticTable = getStaticTable(); + staticTable.newDelete().deleteFile(FILE_A).commit(); + } + + @Test + public void testHasSameProperties(){ + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + table.newOverwrite().deleteFile(FILE_B).addFile(FILE_C).commit(); + Table staticTable = getStaticTable(); + Assert.assertTrue("Same history?", + table.history().containsAll(staticTable.history())); + Assert.assertTrue("Same snapshot?", + table.currentSnapshot().snapshotId() == staticTable.currentSnapshot().snapshotId()); + Assert.assertTrue("Same properties?", + Maps.difference(table.properties(), staticTable.properties()).areEqual()); + } + + @Test + public void testImmutable() { + table.newAppend().appendFile(FILE_A).commit(); + Table staticTable = getStaticTable(); + long originalSnapshot = table.currentSnapshot().snapshotId(); + + table.newAppend().appendFile(FILE_B).commit(); + table.newOverwrite().deleteFile(FILE_B).addFile(FILE_C).commit(); + + Assert.assertEquals("Snapshot unchanged after table modified", + staticTable.currentSnapshot().snapshotId(), originalSnapshot); + } + + @Test + public void testMetadataTables() { + for (MetadataTableType type: MetadataTableType.values()) { + String enumName = type.name().replace("_","").toLowerCase(); + Assert.assertTrue("Should be able to get MetadataTable of type : " + type, + getStaticTable(type).getClass().getName().toLowerCase().contains(enumName)); + } + } + + +} From c2beab7759f02296a9411f2697b949c076ee3090 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Fri, 14 Aug 2020 16:36:40 -0500 Subject: [PATCH 2/2] Reviewer Comments Cleanup various style mistakes Slightly redo logic around parsing MetadataTableNames Fix test cases which were missing refreshes --- .../apache/iceberg/StaticTableOperations.java | 16 +++---- .../apache/iceberg/hadoop/HadoopTables.java | 44 ++++++++++++------- .../iceberg/hadoop/TestStaticTable.java | 44 ++++++++++++++----- 3 files changed, 69 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java index 09d5571015f0..1e061efde8c7 100644 --- a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java @@ -20,13 +20,13 @@ package org.apache.iceberg; -import java.nio.file.Paths; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; /** - * Representation of an immutable snapshot of Table State that can be used in - * other Iceberg Functions. + * TableOperations implementation that provides access to metadata for a Table at some point in time, using a + * table metadata location. It will never refer to a different Metadata object than the one it was created with + * and cannot be used to create or delete files. */ public class StaticTableOperations implements TableOperations { private final TableMetadata staticMetadata; @@ -35,9 +35,9 @@ public class StaticTableOperations implements TableOperations { /** * Creates a StaticTableOperations tied to a specific static version of the TableMetadata */ - public StaticTableOperations(String location, FileIO io) { + public StaticTableOperations(String metadataFileLocation, FileIO io) { this.io = io; - this.staticMetadata = TableMetadataParser.read(io, location); + this.staticMetadata = TableMetadataParser.read(io, metadataFileLocation); } @Override @@ -52,7 +52,7 @@ public TableMetadata refresh() { @Override public void commit(TableMetadata base, TableMetadata metadata) { - throw new UnsupportedOperationException("This TableOperations is static, it cannot be modified"); + throw new UnsupportedOperationException("Cannot modify a static table"); } @Override @@ -62,11 +62,11 @@ public FileIO io() { @Override public String metadataFileLocation(String fileName) { - throw new UnsupportedOperationException("New files cannot be created in a Static Table Operations"); + throw new UnsupportedOperationException("Cannot modify a static table"); } @Override public LocationProvider locationProvider() { - throw new UnsupportedOperationException("New files cannot be created in a Static Table Operations"); + throw new UnsupportedOperationException("Cannot modify a static table"); } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index 18a75778e331..77aaf2cd47f7 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AllDataFilesTable; import org.apache.iceberg.AllEntriesTable; @@ -46,6 +45,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.Pair; /** * Implementation of Iceberg tables that uses the Hadoop FileSystem @@ -71,25 +71,37 @@ public HadoopTables(Configuration conf) { */ @Override public Table load(String location) { - //Possibly Load a Metadata Table - if (location.contains("#") && !location.endsWith("#")) { - // try to resolve a metadata table, which we encode as URI fragments - // e.g. hdfs:///warehouse/my_table#snapshots - int hashIndex = location.lastIndexOf('#'); - String baseTable = location.substring(0, hashIndex); - String metaTable = location.substring(hashIndex + 1); - MetadataTableType type = MetadataTableType.from(metaTable); - if (type != null) { - return loadMetadataTable(baseTable, type); + Pair parsedMetadataType = parseMetadataType(location); + + if (parsedMetadataType != null) { + // Load a metadata table + return loadMetadataTable(parsedMetadataType.first(), parsedMetadataType.second()); + } else { + // Load a normal table + TableOperations ops = newTableOps(location); + if (ops.current() != null) { + return new BaseTable(ops, location); + } else { + throw new NoSuchTableException("Table does not exist at location: " + location); } } + } - //Normal Table Load if we haven't loaded a MetadataTable - TableOperations ops = newTableOps(location); - if (ops.current() != null) { - return new BaseTable(ops, location); + /** + * Try to resolve a metadata table, which we encode as URI fragments + * e.g. hdfs:///warehouse/my_table#snapshots + * @param location Path to parse + * @return A base table name and MetadataTableType if a type is found, null if not + */ + private Pair parseMetadataType(String location) { + int hashIndex = location.lastIndexOf('#'); + if (hashIndex != -1 & !location.endsWith("#")) { + String baseTable = location.substring(0, hashIndex); + String metaTable = location.substring(hashIndex + 1); + MetadataTableType type = MetadataTableType.from(metaTable); + return (type == null) ? null : Pair.of(baseTable, type); } else { - throw new NoSuchTableException("Table does not exist at location: " + location); + return null; } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java index 942100f715ec..16a5ddd582fe 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java @@ -1,11 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iceberg.hadoop; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -26,21 +45,23 @@ public void testLoadFromMetadata() { ((HasTableOperations) staticTable).operations() instanceof StaticTableOperations); } - @Test(expected = UnsupportedOperationException.class) - public void testCannotBeAddedTo(){ + @Test + public void testCannotBeAddedTo() { Table staticTable = getStaticTable(); - staticTable.newOverwrite().addFile(FILE_A).commit(); + AssertHelpers.assertThrows("Cannot modify a static table", UnsupportedOperationException.class, + () -> staticTable.newOverwrite().addFile(FILE_A).commit()); } - @Test(expected = UnsupportedOperationException.class) - public void testCannotBeDeletedFrom(){ + @Test + public void testCannotBeDeletedFrom() { table.newAppend().appendFile(FILE_A).commit(); Table staticTable = getStaticTable(); - staticTable.newDelete().deleteFile(FILE_A).commit(); + AssertHelpers.assertThrows("Cannot modify a static table", UnsupportedOperationException.class, + () -> staticTable.newDelete().deleteFile(FILE_A).commit()); } @Test - public void testHasSameProperties(){ + public void testHasSameProperties() { table.newAppend().appendFile(FILE_A).commit(); table.newAppend().appendFile(FILE_B).commit(); table.newOverwrite().deleteFile(FILE_B).addFile(FILE_C).commit(); @@ -61,6 +82,7 @@ public void testImmutable() { table.newAppend().appendFile(FILE_B).commit(); table.newOverwrite().deleteFile(FILE_B).addFile(FILE_C).commit(); + staticTable.refresh(); Assert.assertEquals("Snapshot unchanged after table modified", staticTable.currentSnapshot().snapshotId(), originalSnapshot); @@ -68,8 +90,8 @@ public void testImmutable() { @Test public void testMetadataTables() { - for (MetadataTableType type: MetadataTableType.values()) { - String enumName = type.name().replace("_","").toLowerCase(); + for (MetadataTableType type : MetadataTableType.values()) { + String enumName = type.name().replace("_", "").toLowerCase(); Assert.assertTrue("Should be able to get MetadataTable of type : " + type, getStaticTable(type).getClass().getName().toLowerCase().contains(enumName)); }