Skip to content
34 changes: 33 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ private TableProperties() {
*/
public static final String UUID = "uuid";

/**
* Reserved table property for the total number of snapshots.
* <p>
* This reserved property is used to store the total number of snapshots.
*/
public static final String SNAPSHOT_COUNT = "snapshot-count";

/**
* Reserved table property for current snapshot summary.
* <p>
* This reserved property is used to store the current snapshot summary.
*/
public static final String CURRENT_SNAPSHOT_SUMMARY = "current-snapshot-summary";

/**
* Reserved table property for current snapshot id.
* <p>
* This reserved property is used to store the current snapshot id.
*/
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";

/**
* Reserved table property for current snapshot timestamp.
* <p>
* This reserved property is used to store the current snapshot timestamp.
*/
public static final String CURRENT_SNAPSHOT_TIMESTAMP = "current-snapshot-timestamp-ms";

/**
* Reserved Iceberg table properties list.
* <p>
Expand All @@ -55,7 +83,11 @@ private TableProperties() {
*/
public static final Set<String> RESERVED_PROPERTIES = ImmutableSet.of(
FORMAT_VERSION,
UUID
UUID,
SNAPSHOT_COUNT,
CURRENT_SNAPSHOT_ID,
CURRENT_SNAPSHOT_SUMMARY,
CURRENT_SNAPSHOT_TIMESTAMP
);

public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.hive;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.net.InetAddress;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand All @@ -90,6 +92,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms";

// the max size is based on HMS backend database. For Hive versions below 2.3, the max table parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
Expand Down Expand Up @@ -148,6 +155,7 @@ private static class WaitingForLockException extends RuntimeException {
private final long lockAcquireTimeout;
private final long lockCheckMinWaitTime;
private final long lockCheckMaxWaitTime;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool<IMetaStoreClient, TException> metaClients;
Expand All @@ -168,6 +176,7 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
this.metadataRefreshMaxRetries =
conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
long tableLevelLockCacheEvictionTimeout =
conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
Expand Down Expand Up @@ -402,9 +411,41 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}

setSnapshotStats(metadata, parameters);

tbl.setParameters(parameters);
}

private void setSnapshotStats(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);

Snapshot currentSnapshot = metadata.currentSnapshot();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a way to clear the properties if they are not there anymore in current snapshot (like original code setHmsProperties). For example if the summary becomes suddenly over the length in new snapshot, then it will not override the old summary and would be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the new commit.

if (currentSnapshot != null) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId()));
parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis()));
setSnapshotSummary(parameters, currentSnapshot);
}

parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size()));
}

@VisibleForTesting
void setSnapshotSummary(Map<String, String> parameters, Snapshot currentSnapshot) {
try {
String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
if (summary.length() <= maxHiveTablePropertySize) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
} else {
LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}
} catch (JsonProcessingException e) {
LOG.warn("Failed to convert current snapshot({}) summary to a json string", currentSnapshot.snapshotId(), e);
}
}

private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@

import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand All @@ -42,6 +48,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.JsonUtil;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -52,6 +59,9 @@
import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.SortDirection.ASC;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class TestHiveCatalog extends HiveMetastoreTest {
private static ImmutableMap meta = ImmutableMap.of(
Expand Down Expand Up @@ -468,4 +478,85 @@ public void testUUIDinTableProperties() throws Exception {
catalog.dropTable(tableIdentifier);
}
}

@Test
public void testSnapshotStatsTableProperties() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a test with the string exceeding the property size threshold, checking to make sure the behavior is as we expect.

Copy link
Contributor Author

@flyrain flyrain Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case for that

Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
);
TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();

try {
catalog.buildTable(tableIdentifier, schema)
.withLocation(location)
.create();

String tableName = tableIdentifier.name();
org.apache.hadoop.hive.metastore.api.Table hmsTable =
metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);

// check whether parameters are in expected state
Map<String, String> parameters = hmsTable.getParameters();
Assert.assertEquals("0", parameters.get(TableProperties.SNAPSHOT_COUNT));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put some error messages in these asserts, like in other tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it is necessary. The code here can explain itself quite well.

Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY));
Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_ID));
Assert.assertNull(parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP));

// create a snapshot
Table icebergTable = catalog.loadTable(tableIdentifier);
String fileName = UUID.randomUUID().toString();
DataFile file = DataFiles.builder(icebergTable.spec())
.withPath(FileFormat.PARQUET.addExtension(fileName))
.withRecordCount(2)
.withFileSizeInBytes(0)
.build();
icebergTable.newFastAppend().appendFile(file).commit();

// check whether parameters are in expected state
hmsTable = metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);
parameters = hmsTable.getParameters();
Assert.assertEquals("1", parameters.get(TableProperties.SNAPSHOT_COUNT));
String summary = JsonUtil.mapper().writeValueAsString(icebergTable.currentSnapshot().summary());
Assert.assertEquals(summary, parameters.get(TableProperties.CURRENT_SNAPSHOT_SUMMARY));
long snapshotId = icebergTable.currentSnapshot().snapshotId();
Assert.assertEquals(String.valueOf(snapshotId), parameters.get(TableProperties.CURRENT_SNAPSHOT_ID));
Assert.assertEquals(String.valueOf(icebergTable.currentSnapshot().timestampMillis()),
parameters.get(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP));

} finally {
catalog.dropTable(tableIdentifier);
}
}

@Test
public void testSetSnapshotSummary() throws Exception {
Configuration conf = new Configuration();
conf.set("iceberg.hive.table-property-max-size", "4000");
HiveTableOperations spyOps = spy(new HiveTableOperations(conf, null, null, catalog.name(), DB_NAME, "tbl"));
Snapshot snapshot = mock(Snapshot.class);
Map<String, String> summary = Maps.newHashMap();
when(snapshot.summary()).thenReturn(summary);

// create a snapshot summary whose json string size is less than the limit
for (int i = 0; i < 100; i++) {
summary.put(String.valueOf(i), "value");
}
Assert.assertTrue(JsonUtil.mapper().writeValueAsString(summary).length() < 4000);
Map<String, String> parameters = Maps.newHashMap();
spyOps.setSnapshotSummary(parameters, snapshot);
Assert.assertEquals("The snapshot summary must be in parameters", 1, parameters.size());

// create a snapshot summary whose json string size exceeds the limit
for (int i = 0; i < 1000; i++) {
summary.put(String.valueOf(i), "value");
}
long summarySize = JsonUtil.mapper().writeValueAsString(summary).length();
// the limit has been updated to 4000 instead of the default value(32672)
Assert.assertTrue(summarySize > 4000 && summarySize < 32672);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: i think no need for the upper limit check, right? We are not really saving in Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make sure the new limit takes affect.

    conf.set("iceberg.hive.table.parameter.size.max", "4000");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment in the code

Copy link
Member

@szehon-ho szehon-ho Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I meant, can't we just do

Assert.assertTrue("Summary size should be greater than limit", summarySize > 4000).

To me the test is checking whether you can persist something beyond 4000 chars right? The fact that it's below or above 32627 chars should not trigger now that we changed limit to 4000 right? Then, no need for the additional comment, if this is the case. Let me know if I am mistaken though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose is to test whether we save the summary in HMS parameters when the size exceeds the limit. Besides, I also want to test if the limit has changed from 32627 to 4000. That's why I check both.

 Assert.assertTrue(summarySize > 4000 && summarySize < 32672);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK got it.

parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);
spyOps.setSnapshotSummary(parameters, snapshot);
Assert.assertEquals("The snapshot summary must not be in parameters due to the size limit", 0, parameters.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(11, hmsParams.size());
Assert.assertEquals(12, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
Expand Down Expand Up @@ -662,7 +662,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(14, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals(15, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));
Expand Down