diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index ee25d9097bec..0005fa8e04aa 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -47,6 +47,34 @@ private TableProperties() { */ public static final String UUID = "uuid"; + /** + * Reserved table property for the total number of snapshots. + *

+ * This reserved property is used to store the total number of snapshots. + */ + public static final String SNAPSHOT_COUNT = "snapshot-count"; + + /** + * Reserved table property for current snapshot summary. + *

+ * This reserved property is used to store the current snapshot summary. + */ + public static final String CURRENT_SNAPSHOT_SUMMARY = "current-snapshot-summary"; + + /** + * Reserved table property for current snapshot id. + *

+ * This reserved property is used to store the current snapshot id. + */ + public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; + + /** + * Reserved table property for current snapshot timestamp. + *

+ * This reserved property is used to store the current snapshot timestamp. + */ + public static final String CURRENT_SNAPSHOT_TIMESTAMP = "current-snapshot-timestamp-ms"; + /** * Reserved Iceberg table properties list. *

@@ -55,7 +83,11 @@ private TableProperties() { */ public static final Set RESERVED_PROPERTIES = ImmutableSet.of( FORMAT_VERSION, - UUID + UUID, + SNAPSHOT_COUNT, + CURRENT_SNAPSHOT_ID, + CURRENT_SNAPSHOT_SUMMARY, + CURRENT_SNAPSHOT_TIMESTAMP ); public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries"; diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index c6d081f731bc..88374ea0df65 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hive; +import com.fasterxml.jackson.core.JsonProcessingException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import java.net.InetAddress; @@ -71,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -90,6 +92,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries"; private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms"; + + // the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000 + // characters, see https://issues.apache.org/jira/browse/HIVE-12274 + private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size"; + private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds @@ -148,6 +155,7 @@ private static class WaitingForLockException extends RuntimeException { private final long lockAcquireTimeout; private final long lockCheckMinWaitTime; private final long lockCheckMaxWaitTime; + private final long maxHiveTablePropertySize; private final int metadataRefreshMaxRetries; private final FileIO fileIO; private final ClientPool metaClients; @@ -168,6 +176,7 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); this.metadataRefreshMaxRetries = conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT); + this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); long tableLevelLockCacheEvictionTimeout = conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT); initTableLevelLockCache(tableLevelLockCacheEvictionTimeout); @@ -402,9 +411,41 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); } + setSnapshotStats(metadata, parameters); + tbl.setParameters(parameters); } + private void setSnapshotStats(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); + + Snapshot currentSnapshot = metadata.currentSnapshot(); + if (currentSnapshot != null) { + parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId())); + parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis())); + setSnapshotSummary(parameters, currentSnapshot); + } + + parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size())); + } + + @VisibleForTesting + void setSnapshotSummary(Map parameters, Snapshot currentSnapshot) { + try { + String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary()); + if (summary.length() <= maxHiveTablePropertySize) { + parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary); + } else { + LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters", + currentSnapshot.snapshotId(), maxHiveTablePropertySize); + } + } catch (JsonProcessingException e) { + LOG.warn("Failed to convert current snapshot({}) summary to a json string", currentSnapshot.snapshotId(), e); + } + } + private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 74716b771cb0..40011625cdc5 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -21,11 +21,17 @@ import java.util.List; import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -42,6 +48,7 @@ import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Rule; @@ -52,6 +59,9 @@ import static org.apache.iceberg.NullOrder.NULLS_FIRST; import static org.apache.iceberg.SortDirection.ASC; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class TestHiveCatalog extends HiveMetastoreTest { private static ImmutableMap meta = ImmutableMap.of( @@ -468,4 +478,85 @@ public void testUUIDinTableProperties() throws Exception { catalog.dropTable(tableIdentifier); } } + + @Test + public void testSnapshotStatsTableProperties() throws Exception { + Schema schema = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); + TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl"); + String location = temp.newFolder("tbl").toString(); + + try { + catalog.buildTable(tableIdentifier, schema) + .withLocation(location) + .create(); + + String tableName = tableIdentifier.name(); + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName); + + // check whether parameters are in expected state + Map parameters = hmsTable.getParameters(); + Assert.assertEquals("0", parameters.get(TableProperties.SNAPSHOT_COUNT)); + Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY)); + Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_ID)); + Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP)); + + // create a snapshot + Table icebergTable = catalog.loadTable(tableIdentifier); + String fileName = UUID.randomUUID().toString(); + DataFile file = DataFiles.builder(icebergTable.spec()) + .withPath(FileFormat.PARQUET.addExtension(fileName)) + .withRecordCount(2) + .withFileSizeInBytes(0) + .build(); + icebergTable.newFastAppend().appendFile(file).commit(); + + // check whether parameters are in expected state + hmsTable = metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName); + parameters = hmsTable.getParameters(); + Assert.assertEquals("1", parameters.get(TableProperties.SNAPSHOT_COUNT)); + String summary = JsonUtil.mapper().writeValueAsString(icebergTable.currentSnapshot().summary()); + Assert.assertEquals(summary, parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY)); + long snapshotId = icebergTable.currentSnapshot().snapshotId(); + Assert.assertEquals(String.valueOf(snapshotId), parameters.get(TableProperties.CURRENT_SNAPSHOT_ID)); + Assert.assertEquals(String.valueOf(icebergTable.currentSnapshot().timestampMillis()), + parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP)); + + } finally { + catalog.dropTable(tableIdentifier); + } + } + + @Test + public void testSetSnapshotSummary() throws Exception { + Configuration conf = new Configuration(); + conf.set("iceberg.hive.table-property-max-size", "4000"); + HiveTableOperations spyOps = spy(new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl")); + Snapshot snapshot = mock(Snapshot.class); + Map summary = Maps.newHashMap(); + when(snapshot.summary()).thenReturn(summary); + + // create a snapshot summary whose json string size is less than the limit + for (int i = 0; i < 100; i++) { + summary.put(String.valueOf(i), "value"); + } + Assert.assertTrue(JsonUtil.mapper().writeValueAsString(summary).length() < 4000); + Map parameters = Maps.newHashMap(); + spyOps.setSnapshotSummary(parameters, snapshot); + Assert.assertEquals("The snapshot summary must be in parameters", 1, parameters.size()); + + // create a snapshot summary whose json string size exceeds the limit + for (int i = 0; i < 1000; i++) { + summary.put(String.valueOf(i), "value"); + } + long summarySize = JsonUtil.mapper().writeValueAsString(summary).length(); + // the limit has been updated to 4000 instead of the default value(32672) + Assert.assertTrue(summarySize > 4000 && summarySize < 32672); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); + spyOps.setSnapshotSummary(parameters, snapshot); + Assert.assertEquals("The snapshot summary must not be in parameters due to the size limit", 0, parameters.size()); + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 938ba1997b28..e588c0b3ac79 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -624,7 +624,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { - Assert.assertEquals(11, hmsParams.size()); + Assert.assertEquals(12, hmsParams.size()); Assert.assertEquals("initial_val", hmsParams.get("custom_property")); Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); @@ -662,7 +662,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { - Assert.assertEquals(14, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop + Assert.assertEquals(15, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop Assert.assertEquals("true", hmsParams.get("new_prop_1")); Assert.assertEquals("false", hmsParams.get("new_prop_2")); Assert.assertEquals("new_val", hmsParams.get("custom_property"));