Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,21 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}

/**
* Parse the version from table metadata file name.
*
* @param metadataLocation table metadata file location
* @return version of the table metadata file in success case and
* -1 if the version is not parsable (as a sign that the metadata is not part of this catalog)
*/
private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
if (versionEnd < 0) {
// found filesystem table's metadata
return -1;
}

try {
return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd));
} catch (NumberFormatException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,45 @@

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -68,6 +77,7 @@
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand Down Expand Up @@ -137,30 +147,7 @@ public void testDrop() {
public void testDropWithoutPurgeLeavesTableData() throws IOException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
List<GenericData.Record> records = Lists.newArrayList(
recordBuilder.set("id", 1L).build(),
recordBuilder.set("id", 2L).build(),
recordBuilder.set("id", 3L).build()
);

String fileLocation = table.location().replace("file:", "") + "/data/file.avro";
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
.schema(schema)
.named("test")
.build()) {
for (GenericData.Record rec : records) {
writer.add(rec);
}
}

DataFile file = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(fileLocation)
.withFileSizeInBytes(Files.localInput(fileLocation).getLength())
.build();

table.newAppend().appendFile(file).commit();
String fileLocation = appendData(table, "file");

String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", "");

Expand Down Expand Up @@ -404,6 +391,83 @@ public void testRegisterTable() throws TException {
Assert.assertEquals(originalTable.getSd(), newTable.getSd());
}

@Test
public void testRegisterHadoopTableToHiveCatalog() throws IOException, TException {
// create a hadoop catalog
String tableLocation = tempFolder.newFolder().toString();
HadoopCatalog hadoopCatalog = new HadoopCatalog(new Configuration(), tableLocation);
// create table using hadoop catalog
TableIdentifier identifier = TableIdentifier.of(DB_NAME, "table1");
Table table = hadoopCatalog.createTable(identifier, schema, PartitionSpec.unpartitioned(), Maps.newHashMap());
// insert some data
String file1Location = appendData(table, "file1");
List<FileScanTask> tasks = Lists.newArrayList(table.newScan().planFiles());
Assert.assertEquals("Should scan 1 file", 1, tasks.size());
Assert.assertEquals(tasks.get(0).file().path(), file1Location);

// collect metadata file
List<String> metadataFiles =
Arrays.stream(new File(table.location() + "/metadata").listFiles())
.map(File::getAbsolutePath)
.filter(f -> f.endsWith(getFileExtension(TableMetadataParser.Codec.NONE)))
.collect(Collectors.toList());
Assert.assertEquals(2, metadataFiles.size());

AssertHelpers.assertThrows(
"Hive metastore should not have this table", NoSuchObjectException.class,
"table not found",
() -> metastoreClient.getTable(DB_NAME, "table1"));
AssertHelpers.assertThrows(
"Hive catalog should fail to load the table", NoSuchTableException.class,
"Table does not exist:",
() -> catalog.loadTable(identifier));

// register the table to hive catalog using the latest metadata file
String latestMetadataFile = ((BaseTable) table).operations().current().metadataFileLocation();
catalog.registerTable(identifier, "file:" + latestMetadataFile);
Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1"));

// load the table in hive catalog
table = catalog.loadTable(identifier);
Assert.assertNotNull(table);

// insert some data
String file2Location = appendData(table, "file2");
tasks = Lists.newArrayList(table.newScan().planFiles());
Assert.assertEquals("Should scan 2 files", 2, tasks.size());
Set<String> files = tasks.stream().map(task -> task.file().path().toString()).collect(Collectors.toSet());
Assert.assertTrue(files.contains(file1Location) && files.contains(file2Location));
}

private String appendData(Table table, String fileName) throws IOException {
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
List<GenericData.Record> records = Lists.newArrayList(
recordBuilder.set("id", 1L).build(),
recordBuilder.set("id", 2L).build(),
recordBuilder.set("id", 3L).build()
);

String fileLocation = table.location().replace("file:", "") + "/data/" + fileName + ".avro";
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
.schema(schema)
.named("test")
.build()) {
for (GenericData.Record rec : records) {
writer.add(rec);
}
}

DataFile file = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(fileLocation)
.withFileSizeInBytes(Files.localInput(fileLocation).getLength())
.build();

table.newAppend().appendFile(file).commit();

return fileLocation;
}

@Test
public void testRegisterExistingTable() throws TException {
org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
Expand Down