Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
Expand Down Expand Up @@ -108,12 +112,26 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr
// During table creation we might not have the schema information from the Iceberg table, nor from the HMS
// table. In this case we have to generate the schema using the serdeProperties which contains the info
// provided in the CREATE TABLE query.
boolean autoConversion = configuration.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
// If we can not load the table try the provided hive schema
this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty partition column list
this.partitionColumns = ImmutableList.of();

if (serDeProperties.get("metadata_location") != null) {
// If metadata location is provided, extract the schema details from it.
try (FileIO fileIO = new HadoopFileIO(configuration)) {
TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location"));
this.tableSchema = metadata.schema();
this.partitionColumns =
metadata.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
// Validate no schema is provided via create command
if (!getColumnNames().isEmpty() || !getPartitionColumnNames().isEmpty()) {
throw new SerDeException("Column names can not be provided along with metadata location.");
}
}
} else {
boolean autoConversion = configuration.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
// If we can not load the table try the provided hive schema
this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty partition column list
this.partitionColumns = ImmutableList.of();
}
if (e instanceof NoSuchTableException &&
HiveTableUtil.isCtas(serDeProperties) &&
!Catalogs.hiveCatalog(configuration, serDeProperties)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -1751,6 +1752,43 @@ public void testConcurrentIcebergCommitsAndHiveAlterTableCalls() throws Exceptio
((BaseTable) testTables.loadTable(identifier)).operations().current().previousFiles().size());
}

@Test
public void testCreateTableWithMetadataLocationWithoutSchema() throws IOException, TException, InterruptedException {
Assume.assumeTrue("Create with metadata location is only supported for Hive Catalog tables",
testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
TableIdentifier sourceIdentifier = TableIdentifier.of("default", "source");
PartitionSpec spec =
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).identity("customer_id").build();
List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
Table sourceTable =
testTables.createTable(shell, sourceIdentifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec,
FileFormat.PARQUET, records, 1,
ImmutableMap.<String, String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
String metadataLocation = ((BaseTable) sourceTable).operations().current().metadataFileLocation();
shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");

String tblProps =
testTables.propertiesForCreateTableSQL(Collections.singletonMap("metadata_location", metadataLocation));

// Try the query with columns also specified, it should throw exception.
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Column names can not be provided along with metadata location.", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE target (id int) STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(targetIdentifier) + tblProps);
});
shell.executeStatement(
"CREATE EXTERNAL TABLE target STORED BY ICEBERG " + testTables.locationForCreateTableSQL(targetIdentifier) +
tblProps);

// Check the partition and the schema are preserved.
Table targetIcebergTable =
IcebergTableUtil.getTable(shell.getHiveConf(), shell.metastore().getTable(targetIdentifier));
Assert.assertEquals(1, targetIcebergTable.spec().fields().size());
Assert.assertEquals(sourceTable.spec().fields(), targetIcebergTable.spec().fields());
Assert.assertEquals(sourceTable.schema().toString(), targetIcebergTable.schema().toString());
}


/**
* Checks that the new schema has newintcol and newstring col columns on both HMS and Iceberg sides
Expand Down