From 666d1fb21826457cb530f5acc44a4449924d9306 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 2 Jun 2022 18:04:14 +0530 Subject: [PATCH 1/3] API: Support registering filesystem tables to hive catalog --- .../iceberg/BaseMetastoreTableOperations.java | 2 +- .../org/apache/iceberg/hive/HiveCatalog.java | 15 +++++++ .../apache/iceberg/hive/HiveTableTest.java | 39 +++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 6cd6798a0839..642bf4f7b1eb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -151,7 +151,7 @@ protected void disableRefresh() { this.shouldRefresh = false; } - protected String writeNewMetadata(TableMetadata metadata, int newVersion) { + public String writeNewMetadata(TableMetadata metadata, int newVersion) { String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 62c70438e6c9..abd4be967a81 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -246,11 +246,26 @@ public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String TableOperations ops = newTableOps(identifier); InputFile metadataFile = fileIO.newInputFile(metadataFileLocation); TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile); + + if (!isMetastoreTable(metadataFileLocation)) { + // Metastore table and filesystem tables will have different naming conventions as per the spec. + // So, to support importing filesystem tables to metastore catalog, + // create a new metadata file with contents of filesystem table but with a metastore table naming convention. + BaseMetastoreTableOperations baseOps = (BaseMetastoreTableOperations) ops; + String newFileName = baseOps.writeNewMetadata(metadata, baseOps.currentVersion() + 1); + metadataFile = fileIO.newInputFile(newFileName); + metadata = TableMetadataParser.read(ops.io(), metadataFile); + } + ops.commit(null, metadata); return new BaseTable(ops, identifier.toString()); } + private boolean isMetastoreTable(String metadataFileLocation) { + return metadataFileLocation.substring(metadataFileLocation.lastIndexOf("/") + 1).contains("-"); + } + @Override public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 2d987287cad9..a6842f9e0cd2 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -21,14 +21,17 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -42,6 +45,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -49,8 +53,10 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.ConfigProperties; +import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -68,6 +74,7 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -404,6 +411,38 @@ public void testRegisterTable() throws TException { Assert.assertEquals(originalTable.getSd(), newTable.getSd()); } + @Test + public void testRegisterHadoopTableToHiveCatalog() throws IOException, TException { + // create a hadoop catalog + String tableLocation = tempFolder.newFolder().toString(); + HadoopCatalog hadoopCatalog = new HadoopCatalog(new Configuration(), tableLocation); + // create table using hadoop catalog + TableIdentifier identifier = TableIdentifier.of(DB_NAME, "table1"); + Table table = hadoopCatalog.createTable(identifier, schema, PartitionSpec.unpartitioned(), Maps.newHashMap()); + + // collect metadata file + List metadataFiles = + Arrays.stream(new File(table.location() + "/metadata").listFiles()) + .map(File::getAbsolutePath) + .filter(f -> f.endsWith(getFileExtension(TableMetadataParser.Codec.NONE))) + .collect(Collectors.toList()); + Assert.assertEquals(1, metadataFiles.size()); + + AssertHelpers.assertThrows( + "Hive metastore should not have this table", NoSuchObjectException.class, + "table not found", + () -> metastoreClient.getTable(DB_NAME, "table1")); + AssertHelpers.assertThrows( + "Hive catalog should fail to load the table", NoSuchTableException.class, + "Table does not exist:", + () -> catalog.loadTable(identifier)); + // register the table to hive catalog using the metadata file + catalog.registerTable(identifier, "file:" + metadataFiles.get(0)); + Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1")); + // load the table in hive catalog + Assert.assertNotNull(catalog.loadTable(identifier)); + } + @Test public void testRegisterExistingTable() throws TException { org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME); From 136d0b7fa1843a8660007c90cd4229caf92923ed Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 6 Jun 2022 11:04:03 +0530 Subject: [PATCH 2/3] Review comment fixes --- .../iceberg/BaseMetastoreTableOperations.java | 7 +- .../org/apache/iceberg/hive/HiveCatalog.java | 15 ---- .../apache/iceberg/hive/HiveTableTest.java | 74 ++++++++++++------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 642bf4f7b1eb..aa00862a361f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -151,7 +151,7 @@ protected void disableRefresh() { this.shouldRefresh = false; } - public String writeNewMetadata(TableMetadata metadata, int newVersion) { + protected String writeNewMetadata(TableMetadata metadata, int newVersion) { String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath); @@ -336,7 +336,10 @@ private static int parseVersion(String metadataLocation) { int versionEnd = metadataLocation.indexOf('-', versionStart); try { return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd)); - } catch (NumberFormatException e) { + } catch (NumberFormatException | StringIndexOutOfBoundsException e) { + // As per the spec, metastore tables and filesystem tables + // will have different representation for metadata file names. + // So, StringIndexOutOfBoundsException can happen when parsing a filesystem based metadata file name. LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e); return -1; } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index abd4be967a81..62c70438e6c9 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -246,26 +246,11 @@ public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String TableOperations ops = newTableOps(identifier); InputFile metadataFile = fileIO.newInputFile(metadataFileLocation); TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile); - - if (!isMetastoreTable(metadataFileLocation)) { - // Metastore table and filesystem tables will have different naming conventions as per the spec. - // So, to support importing filesystem tables to metastore catalog, - // create a new metadata file with contents of filesystem table but with a metastore table naming convention. - BaseMetastoreTableOperations baseOps = (BaseMetastoreTableOperations) ops; - String newFileName = baseOps.writeNewMetadata(metadata, baseOps.currentVersion() + 1); - metadataFile = fileIO.newInputFile(newFileName); - metadata = TableMetadataParser.read(ops.io(), metadataFile); - } - ops.commit(null, metadata); return new BaseTable(ops, identifier.toString()); } - private boolean isMetastoreTable(String metadataFileLocation) { - return metadataFileLocation.substring(metadataFileLocation.lastIndexOf("/") + 1).contains("-"); - } - @Override public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index a6842f9e0cd2..45811d10d12d 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -39,6 +39,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -144,30 +145,7 @@ public void testDrop() { public void testDropWithoutPurgeLeavesTableData() throws IOException { Table table = catalog.loadTable(TABLE_IDENTIFIER); - GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); - List records = Lists.newArrayList( - recordBuilder.set("id", 1L).build(), - recordBuilder.set("id", 2L).build(), - recordBuilder.set("id", 3L).build() - ); - - String fileLocation = table.location().replace("file:", "") + "/data/file.avro"; - try (FileAppender writer = Avro.write(Files.localOutput(fileLocation)) - .schema(schema) - .named("test") - .build()) { - for (GenericData.Record rec : records) { - writer.add(rec); - } - } - - DataFile file = DataFiles.builder(table.spec()) - .withRecordCount(3) - .withPath(fileLocation) - .withFileSizeInBytes(Files.localInput(fileLocation).getLength()) - .build(); - - table.newAppend().appendFile(file).commit(); + String fileLocation = appendData(table, "file"); String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", ""); @@ -419,6 +397,10 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio // create table using hadoop catalog TableIdentifier identifier = TableIdentifier.of(DB_NAME, "table1"); Table table = hadoopCatalog.createTable(identifier, schema, PartitionSpec.unpartitioned(), Maps.newHashMap()); + // insert some data + appendData(table, "file1"); + List tasks = Lists.newArrayList(table.newScan().planFiles()); + Assert.assertEquals("Should scan 1 file", 1, tasks.size()); // collect metadata file List metadataFiles = @@ -426,7 +408,7 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio .map(File::getAbsolutePath) .filter(f -> f.endsWith(getFileExtension(TableMetadataParser.Codec.NONE))) .collect(Collectors.toList()); - Assert.assertEquals(1, metadataFiles.size()); + Assert.assertEquals(2, metadataFiles.size()); AssertHelpers.assertThrows( "Hive metastore should not have this table", NoSuchObjectException.class, @@ -436,11 +418,49 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio "Hive catalog should fail to load the table", NoSuchTableException.class, "Table does not exist:", () -> catalog.loadTable(identifier)); - // register the table to hive catalog using the metadata file + + // register the table to hive catalog using the latest metadata file + metadataFiles.sort(Collections.reverseOrder()); catalog.registerTable(identifier, "file:" + metadataFiles.get(0)); Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1")); + // load the table in hive catalog - Assert.assertNotNull(catalog.loadTable(identifier)); + table = catalog.loadTable(identifier); + Assert.assertNotNull(table); + + // insert some data + appendData(table, "file2"); + tasks = Lists.newArrayList(table.newScan().planFiles()); + Assert.assertEquals("Should scan 2 files", 2, tasks.size()); + } + + private String appendData(Table table, String fileName) throws IOException { + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); + List records = Lists.newArrayList( + recordBuilder.set("id", 1L).build(), + recordBuilder.set("id", 2L).build(), + recordBuilder.set("id", 3L).build() + ); + + String fileLocation = table.location().replace("file:", "") + "/data/" + fileName + ".avro"; + try (FileAppender writer = Avro.write(Files.localOutput(fileLocation)) + .schema(schema) + .named("test") + .build()) { + for (GenericData.Record rec : records) { + writer.add(rec); + } + } + + DataFile file = DataFiles.builder(table.spec()) + .withRecordCount(3) + .withPath(fileLocation) + .withFileSizeInBytes(Files.localInput(fileLocation).getLength()) + .build(); + + table.newAppend().appendFile(file).commit(); + + return fileLocation; } @Test From ad577ec5077c2d11b8b5f03732cbc033c4908273 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 7 Jun 2022 08:12:12 +0530 Subject: [PATCH 3/3] review comments round 2 --- .../iceberg/BaseMetastoreTableOperations.java | 17 +++++++++++++---- .../org/apache/iceberg/hive/HiveTableTest.java | 13 +++++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index aa00862a361f..bfc91416ed41 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -331,15 +331,24 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); } + /** + * Parse the version from table metadata file name. + * + * @param metadataLocation table metadata file location + * @return version of the table metadata file in success case and + * -1 if the version is not parsable (as a sign that the metadata is not part of this catalog) + */ private static int parseVersion(String metadataLocation) { int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 int versionEnd = metadataLocation.indexOf('-', versionStart); + if (versionEnd < 0) { + // found filesystem table's metadata + return -1; + } + try { return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd)); - } catch (NumberFormatException | StringIndexOutOfBoundsException e) { - // As per the spec, metastore tables and filesystem tables - // will have different representation for metadata file names. - // So, StringIndexOutOfBoundsException can happen when parsing a filesystem based metadata file name. + } catch (NumberFormatException e) { LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e); return -1; } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 45811d10d12d..605621309092 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; @@ -398,9 +400,10 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio TableIdentifier identifier = TableIdentifier.of(DB_NAME, "table1"); Table table = hadoopCatalog.createTable(identifier, schema, PartitionSpec.unpartitioned(), Maps.newHashMap()); // insert some data - appendData(table, "file1"); + String file1Location = appendData(table, "file1"); List tasks = Lists.newArrayList(table.newScan().planFiles()); Assert.assertEquals("Should scan 1 file", 1, tasks.size()); + Assert.assertEquals(tasks.get(0).file().path(), file1Location); // collect metadata file List metadataFiles = @@ -420,8 +423,8 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio () -> catalog.loadTable(identifier)); // register the table to hive catalog using the latest metadata file - metadataFiles.sort(Collections.reverseOrder()); - catalog.registerTable(identifier, "file:" + metadataFiles.get(0)); + String latestMetadataFile = ((BaseTable) table).operations().current().metadataFileLocation(); + catalog.registerTable(identifier, "file:" + latestMetadataFile); Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1")); // load the table in hive catalog @@ -429,9 +432,11 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio Assert.assertNotNull(table); // insert some data - appendData(table, "file2"); + String file2Location = appendData(table, "file2"); tasks = Lists.newArrayList(table.newScan().planFiles()); Assert.assertEquals("Should scan 2 files", 2, tasks.size()); + Set files = tasks.stream().map(task -> task.file().path().toString()).collect(Collectors.toSet()); + Assert.assertTrue(files.contains(file1Location) && files.contains(file2Location)); } private String appendData(Table table, String fileName) throws IOException {