diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e955b7f77c96..50e434093add 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -41,40 +41,45 @@ private TableProperties() { public static final String FORMAT_VERSION = "format-version"; /** - * Reserved table property for UUID. - *

- * This reserved property is used to store the UUID of the table. + * Reserved table property for table UUID. */ public static final String UUID = "uuid"; /** * Reserved table property for the total number of snapshots. - *

- * This reserved property is used to store the total number of snapshots. */ public static final String SNAPSHOT_COUNT = "snapshot-count"; /** * Reserved table property for current snapshot summary. - *

- * This reserved property is used to store the current snapshot summary. */ public static final String CURRENT_SNAPSHOT_SUMMARY = "current-snapshot-summary"; /** * Reserved table property for current snapshot id. - *

- * This reserved property is used to store the current snapshot id. */ public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; /** * Reserved table property for current snapshot timestamp. - *

- * This reserved property is used to store the current snapshot timestamp. */ public static final String CURRENT_SNAPSHOT_TIMESTAMP = "current-snapshot-timestamp-ms"; + /** + * Reserved table property for the JSON representation of current schema. + */ + public static final String CURRENT_SCHEMA = "current-schema"; + + /** + * Reserved table property for the JSON representation of current(default) partition spec. + */ + public static final String DEFAULT_PARTITION_SPEC = "default-partition-spec"; + + /** + * Reserved table property for the JSON representation of current(default) sort order. + */ + public static final String DEFAULT_SORT_ORDER = "default-sort-order"; + /** * Reserved Iceberg table properties list. *

@@ -87,7 +92,10 @@ private TableProperties() { SNAPSHOT_COUNT, CURRENT_SNAPSHOT_ID, CURRENT_SNAPSHOT_SUMMARY, - CURRENT_SNAPSHOT_TIMESTAMP + CURRENT_SNAPSHOT_TIMESTAMP, + CURRENT_SCHEMA, + DEFAULT_PARTITION_SPEC, + DEFAULT_SORT_ORDER ); public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries"; diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 9b2223a51670..1303fd15d89b 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -52,8 +52,11 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -93,6 +96,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { // the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000 // characters, see https://issues.apache.org/jira/browse/HIVE-12274 + // set to 0 to not expose Iceberg metadata in HMS Table properties. private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size"; private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes @@ -399,17 +403,21 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM } setSnapshotStats(metadata, parameters); + setSchema(metadata, parameters); + setPartitionSpec(metadata, parameters); + setSortOrder(metadata, parameters); tbl.setParameters(parameters); } - private void setSnapshotStats(TableMetadata metadata, Map parameters) { + @VisibleForTesting + void setSnapshotStats(TableMetadata metadata, Map parameters) { parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID); parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP); parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); Snapshot currentSnapshot = metadata.currentSnapshot(); - if (currentSnapshot != null) { + if (exposeInHmsProperties() && currentSnapshot != null) { parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId())); parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis())); setSnapshotSummary(parameters, currentSnapshot); @@ -433,6 +441,45 @@ void setSnapshotSummary(Map parameters, Snapshot currentSnapshot } } + @VisibleForTesting + void setSchema(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.CURRENT_SCHEMA); + if (exposeInHmsProperties() && metadata.schema() != null) { + String schema = SchemaParser.toJson(metadata.schema()); + setField(parameters, TableProperties.CURRENT_SCHEMA, schema); + } + } + + @VisibleForTesting + void setPartitionSpec(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC); + if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) { + String spec = PartitionSpecParser.toJson(metadata.spec()); + setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec); + } + } + + @VisibleForTesting + void setSortOrder(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_SORT_ORDER); + if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) { + String sortOrder = SortOrderParser.toJson(metadata.sortOrder()); + setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder); + } + } + + private void setField(Map parameters, String key, String value) { + if (value.length() <= maxHiveTablePropertySize) { + parameters.put(key, value); + } else { + LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize); + } + } + + private boolean exposeInHmsProperties() { + return maxHiveTablePropertySize > 0; + } + private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 855ba4f8119b..69fa23f391fd 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -32,12 +32,17 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -60,9 +65,15 @@ import static org.apache.iceberg.NullOrder.NULLS_FIRST; import static org.apache.iceberg.SortDirection.ASC; +import static org.apache.iceberg.TableProperties.CURRENT_SCHEMA; +import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_ID; +import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_SUMMARY; +import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_TIMESTAMP; +import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC; +import static org.apache.iceberg.TableProperties.DEFAULT_SORT_ORDER; +import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestHiveCatalog extends HiveMetastoreTest { @@ -232,7 +243,7 @@ public void testReplaceTxnBuilder() throws Exception { } @Test - public void testCreateTableDefaultSortOrder() { + public void testCreateTableDefaultSortOrder() throws Exception { Schema schema = new Schema( required(1, "id", Types.IntegerType.get(), "unique ID"), required(2, "data", Types.StringType.get()) @@ -246,13 +257,16 @@ public void testCreateTableDefaultSortOrder() { Table table = catalog.createTable(tableIdent, schema, spec); Assert.assertEquals("Order ID must match", 0, table.sortOrder().orderId()); Assert.assertTrue("Order must unsorted", table.sortOrder().isUnsorted()); + + Assert.assertFalse("Must not have default sort order in catalog", + hmsTableParameters().containsKey(DEFAULT_SORT_ORDER)); } finally { catalog.dropTable(tableIdent); } } @Test - public void testCreateTableCustomSortOrder() { + public void testCreateTableCustomSortOrder() throws Exception { Schema schema = new Schema( required(1, "id", Types.IntegerType.get(), "unique ID"), required(2, "data", Types.StringType.get()) @@ -277,6 +291,8 @@ public void testCreateTableCustomSortOrder() { Assert.assertEquals("Null order must match ", NULLS_FIRST, sortOrder.fields().get(0).nullOrder()); Transform transform = Transforms.identity(Types.IntegerType.get()); Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform()); + + Assert.assertEquals(SortOrderParser.toJson(table.sortOrder()), hmsTableParameters().get(DEFAULT_SORT_ORDER)); } finally { catalog.dropTable(tableIdent); } @@ -469,13 +485,7 @@ public void testUUIDinTableProperties() throws Exception { .withLocation(location) .create(); - String tableName = tableIdentifier.name(); - org.apache.hadoop.hive.metastore.api.Table hmsTable = - metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName); - - // check parameters are in expected state - Map parameters = hmsTable.getParameters(); - Assert.assertNotNull(parameters.get(TableProperties.UUID)); + Assert.assertNotNull(hmsTableParameters().get(TableProperties.UUID)); } finally { catalog.dropTable(tableIdentifier); } @@ -495,16 +505,12 @@ public void testSnapshotStatsTableProperties() throws Exception { .withLocation(location) .create(); - String tableName = tableIdentifier.name(); - org.apache.hadoop.hive.metastore.api.Table hmsTable = - metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName); - // check whether parameters are in expected state - Map parameters = hmsTable.getParameters(); + Map parameters = hmsTableParameters(); Assert.assertEquals("0", parameters.get(TableProperties.SNAPSHOT_COUNT)); - Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY)); - Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_ID)); - Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP)); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_SUMMARY)); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_ID)); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_TIMESTAMP)); // create a snapshot Table icebergTable = catalog.loadTable(tableIdentifier); @@ -517,15 +523,14 @@ public void testSnapshotStatsTableProperties() throws Exception { icebergTable.newFastAppend().appendFile(file).commit(); // check whether parameters are in expected state - hmsTable = metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName); - parameters = hmsTable.getParameters(); + parameters = hmsTableParameters(); Assert.assertEquals("1", parameters.get(TableProperties.SNAPSHOT_COUNT)); String summary = JsonUtil.mapper().writeValueAsString(icebergTable.currentSnapshot().summary()); - Assert.assertEquals(summary, parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY)); + Assert.assertEquals(summary, parameters.get(CURRENT_SNAPSHOT_SUMMARY)); long snapshotId = icebergTable.currentSnapshot().snapshotId(); - Assert.assertEquals(String.valueOf(snapshotId), parameters.get(TableProperties.CURRENT_SNAPSHOT_ID)); + Assert.assertEquals(String.valueOf(snapshotId), parameters.get(CURRENT_SNAPSHOT_ID)); Assert.assertEquals(String.valueOf(icebergTable.currentSnapshot().timestampMillis()), - parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP)); + parameters.get(CURRENT_SNAPSHOT_TIMESTAMP)); } finally { catalog.dropTable(tableIdentifier); @@ -536,7 +541,7 @@ public void testSnapshotStatsTableProperties() throws Exception { public void testSetSnapshotSummary() throws Exception { Configuration conf = new Configuration(); conf.set("iceberg.hive.table-property-max-size", "4000"); - HiveTableOperations spyOps = spy(new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl")); + HiveTableOperations ops = new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl"); Snapshot snapshot = mock(Snapshot.class); Map summary = Maps.newHashMap(); when(snapshot.summary()).thenReturn(summary); @@ -547,7 +552,7 @@ public void testSetSnapshotSummary() throws Exception { } Assert.assertTrue(JsonUtil.mapper().writeValueAsString(summary).length() < 4000); Map parameters = Maps.newHashMap(); - spyOps.setSnapshotSummary(parameters, snapshot); + ops.setSnapshotSummary(parameters, snapshot); Assert.assertEquals("The snapshot summary must be in parameters", 1, parameters.size()); // create a snapshot summary whose json string size exceeds the limit @@ -557,11 +562,93 @@ public void testSetSnapshotSummary() throws Exception { long summarySize = JsonUtil.mapper().writeValueAsString(summary).length(); // the limit has been updated to 4000 instead of the default value(32672) Assert.assertTrue(summarySize > 4000 && summarySize < 32672); - parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); - spyOps.setSnapshotSummary(parameters, snapshot); + parameters.remove(CURRENT_SNAPSHOT_SUMMARY); + ops.setSnapshotSummary(parameters, snapshot); Assert.assertEquals("The snapshot summary must not be in parameters due to the size limit", 0, parameters.size()); } + @Test + public void testNotExposeTableProperties() { + Configuration conf = new Configuration(); + conf.set("iceberg.hive.table-property-max-size", "0"); + HiveTableOperations ops = new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl"); + TableMetadata metadata = mock(TableMetadata.class); + Map parameters = Maps.newHashMap(); + parameters.put(CURRENT_SNAPSHOT_SUMMARY, "summary"); + parameters.put(CURRENT_SNAPSHOT_ID, "snapshotId"); + parameters.put(CURRENT_SNAPSHOT_TIMESTAMP, "timestamp"); + parameters.put(CURRENT_SCHEMA, "schema"); + parameters.put(DEFAULT_PARTITION_SPEC, "partitionSpec"); + parameters.put(DEFAULT_SORT_ORDER, "sortOrder"); + + ops.setSnapshotStats(metadata, parameters); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_SUMMARY)); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_ID)); + Assert.assertNull(parameters.get(CURRENT_SNAPSHOT_TIMESTAMP)); + + ops.setSchema(metadata, parameters); + Assert.assertNull(parameters.get(CURRENT_SCHEMA)); + + ops.setPartitionSpec(metadata, parameters); + Assert.assertNull(parameters.get(DEFAULT_PARTITION_SPEC)); + + ops.setSortOrder(metadata, parameters); + Assert.assertNull(parameters.get(DEFAULT_SORT_ORDER)); + } + + @Test + public void testSetDefaultPartitionSpec() throws Exception { + Schema schema = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); + + try { + Table table = catalog.buildTable(tableIdent, schema).create(); + Assert.assertFalse("Must not have default partition spec", + hmsTableParameters().containsKey(TableProperties.DEFAULT_PARTITION_SPEC)); + + table.updateSpec().addField(bucket("data", 16)).commit(); + Assert.assertEquals(PartitionSpecParser.toJson(table.spec()), + hmsTableParameters().get(TableProperties.DEFAULT_PARTITION_SPEC)); + } finally { + catalog.dropTable(tableIdent); + } + } + + @Test + public void testSetCurrentSchema() throws Exception { + Schema schema = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); + + try { + Table table = catalog.buildTable(tableIdent, schema).create(); + + Assert.assertEquals(SchemaParser.toJson(table.schema()), hmsTableParameters().get(CURRENT_SCHEMA)); + + // add many new fields to make the schema json string exceed the limit + UpdateSchema updateSchema = table.updateSchema(); + for (int i = 0; i < 600; i++) { + updateSchema.addColumn("new_col_" + i, Types.StringType.get()); + } + updateSchema.commit(); + + Assert.assertTrue(SchemaParser.toJson(table.schema()).length() > 32672); + Assert.assertNull(hmsTableParameters().get(CURRENT_SCHEMA)); + } finally { + catalog.dropTable(tableIdent); + } + } + + private Map hmsTableParameters() throws TException { + org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, "tbl"); + return hmsTable.getParameters(); + } + @Test public void testConstructorWarehousePathWithEndSlash() { HiveCatalog catalogWithSlash = new HiveCatalog(); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index e588c0b3ac79..7a28a9536b6b 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -624,7 +624,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { - Assert.assertEquals(12, hmsParams.size()); + Assert.assertEquals(13, hmsParams.size()); Assert.assertEquals("initial_val", hmsParams.get("custom_property")); Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); @@ -662,7 +662,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { - Assert.assertEquals(15, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop + Assert.assertEquals(16, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop Assert.assertEquals("true", hmsParams.get("new_prop_1")); Assert.assertEquals("false", hmsParams.get("new_prop_2")); Assert.assertEquals("new_val", hmsParams.get("custom_property"));