diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java new file mode 100644 index 0000000000..d6e36f5031 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.exceptions; + +/** + * Exception for a failure to confirm either affirmatively or negatively that a commit was applied. The client + * cannot take any further action without possibly corrupting the table. + */ +public class CommitStateUnknownException extends RuntimeException { + + private static final String COMMON_INFO = + "Cannot determine whether the commit was successful or not, the underlying data files may or " + + "may not be needed. Manual intervention via the Remove Orphan Files Action can remove these " + + "files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.\n" + + "Please check to see whether or not your commit was successful before retrying this commit. Retrying " + + "an already successful operation will result in duplicate records or unintentional modifications.\n" + + "At this time no files will be deleted including possibly unused manifest lists."; + + public CommitStateUnknownException(Throwable cause) { + super(cause.getMessage() + "\n" + COMMON_INFO, cause); + } +} diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index a7e11e1a7b..3a179f99fb 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -32,6 +32,7 @@ import java.util.function.Consumer; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -218,12 +219,15 @@ private Map summary(TableMetadata previous) { } } else { // if there was no previous snapshot, default the summary to start totals at 0 - previousSummary = ImmutableMap.of( - SnapshotSummary.TOTAL_RECORDS_PROP, "0", - SnapshotSummary.TOTAL_DATA_FILES_PROP, "0", - SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0", - SnapshotSummary.TOTAL_POS_DELETES_PROP, "0", - SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0"); + ImmutableMap.Builder summaryBuilder = ImmutableMap.builder(); + summaryBuilder + .put(SnapshotSummary.TOTAL_RECORDS_PROP, "0") + .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0") + .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0") + .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0") + .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0") + .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0"); + previousSummary = summaryBuilder.build(); } ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -234,6 +238,9 @@ private Map summary(TableMetadata previous) { updateTotal( builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP, summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP); + updateTotal( + builder, previousSummary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, + summary, SnapshotSummary.ADDED_FILE_SIZE_PROP, SnapshotSummary.REMOVED_FILE_SIZE_PROP); updateTotal( builder, previousSummary, SnapshotSummary.TOTAL_DATA_FILES_PROP, summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP); @@ -293,6 +300,8 @@ public void commit() { taskOps.commit(base, updated.withUUID()); }); + } catch (CommitStateUnknownException commitStateUnknownException) { + throw commitStateUnknownException; } catch (RuntimeException e) { Exceptions.suppressAndThrow(e, this::cleanAll); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 834fa0b954..2f1a6e25e5 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -38,6 +38,7 @@ public class SnapshotSummary { public static final String TOTAL_RECORDS_PROP = "total-records"; public static final String ADDED_FILE_SIZE_PROP = "added-files-size"; public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size"; + public static final String TOTAL_FILE_SIZE_PROP = "total-files-size"; public static final String ADDED_POS_DELETES_PROP = "added-position-deletes"; public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes"; public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes"; diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 98184231f6..3648b1211b 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -52,6 +52,13 @@ public interface TableOperations { * Implementations must check that the base metadata is current to avoid overwriting updates. * Once the atomic commit operation succeeds, implementations must not perform any operations that * may fail because failure in this method cannot be distinguished from commit failure. + *

+ * Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException} + * in cases where it cannot be determined if the commit succeeded or failed. + * For example if a network partition causes the confirmation of the commit to be lost, + * the implementation should throw a CommitStateUnknownException. This is important because downstream users of + * this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe + * to remove any files. All other exceptions will be treated as if the commit has failed. * * @param base table metadata on which changes were based * @param metadata new table metadata with updates diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index a303c10c4f..3298d89e3c 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -36,6 +36,9 @@ private TableProperties() { public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"; public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes + public static final String COMMIT_NUM_STATUS_CHECKS = "commit.num-status-checks"; + public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3; + public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"; public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608; // 8 MB diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java new file mode 100644 index 0000000000..f1b0fdb35b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSnapshotSummary extends TableTestBase { + public TestSnapshotSummary(int formatVersion) { + super(formatVersion); + } + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + @Test + public void testFileSizeSummary() { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + // fast append + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + Map summary = table.currentSnapshot().summary(); + Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + // merge append + table.newAppend() + .appendFile(FILE_B) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertEquals("10", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertNull(summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + table.newOverwrite() + .deleteFile(FILE_A) + .deleteFile(FILE_B) + .addFile(FILE_C) + .addFile(FILE_D) + .addFile(FILE_D) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertEquals("30", summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("30", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + + table.newDelete() + .deleteFile(FILE_C) + .deleteFile(FILE_D) + .commit(); + summary = table.currentSnapshot().summary(); + Assert.assertNull(summary.get(SnapshotSummary.ADDED_FILE_SIZE_PROP)); + Assert.assertEquals("20", summary.get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)); + Assert.assertEquals("10", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java index c5d45696b8..3c6a4f7677 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java @@ -36,9 +36,11 @@ import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ /** * A {@link HiveTableOperations} that does not override any existing Hive metadata. + * TODO: This extension should be removed once dual-publish of iceberg+hive is stopped. * * The behaviour of this class differs from {@link HiveTableOperations} in the following ways: * 1. Does not modify serde information of existing Hive table, this means that if Iceberg schema is updated @@ -139,7 +142,7 @@ protected void doRefresh() { protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); - boolean threw = true; + CommitStatus commitStatus = CommitStatus.FAILURE; Optional lockId = Optional.empty(); try { lockId = Optional.of(acquireLock()); @@ -180,26 +183,27 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { baseMetadataLocation, metadataLocation, database, tableName); } - setParameters(newMetadataLocation, tbl, false); - - if (tableExists) { - metaClients.run(client -> { - EnvironmentContext envContext = new EnvironmentContext( - ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) - ); - LOG.debug("Updating the metadata location of the following table:"); - logTable(tbl); - LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP)); - ALTER_TABLE.invoke(client, database, tableName, tbl, envContext); - return null; - }); - } else { - metaClients.run(client -> { - client.createTable(tbl); - return null; - }); + // [LINKEDIN] comply to the new signature of setting Hive table's properties by + // setting newly added parameters as empty container. + setHmsTableParameters(newMetadataLocation, tbl, ImmutableMap.of(), + ImmutableSet.of(), false, ImmutableMap.of()); + + try { + persistTableVerbal(tbl, tableExists); + commitStatus = CommitStatus.SUCCESS; + } catch (Throwable persistFailure) { + LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, tableName, persistFailure); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw persistFailure; + case UNKNOWN: + throw new CommitStateUnknownException(persistFailure); + } } - threw = false; } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); @@ -216,30 +220,31 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new RuntimeException("Interrupted during commit", e); } finally { - if (threw && !metadataUpdatedSuccessfully(newMetadataLocation)) { - // if anything went wrong and meta store hasn't been updated, clean up the uncommitted metadata file - io().deleteFile(newMetadataLocation); - } - unlock(lockId); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId); } } - private boolean metadataUpdatedSuccessfully(String newMetadataLocation) { - boolean metadataUpdatedSuccessfully = false; - try { - Table tbl; - boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName)); - if (tableExists) { - tbl = metaClients.run(client -> client.getTable(database, tableName)); - if (tbl.getParameters().get(METADATA_LOCATION_PROP).equals(newMetadataLocation)) { - metadataUpdatedSuccessfully = true; - } - } - } catch (Exception e) { - // This indicate the client might be closed and we cannout make sure whether the table has been updated, so - // assume it succeeds to avoid table pointing to non-exist location - metadataUpdatedSuccessfully = true; + /** + * [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by + * org.apache.iceberg.hive.HiveTableOperations#persistTable + */ + void persistTableVerbal(Table tbl, boolean tableExists) throws TException, InterruptedException { + if (tableExists) { + metaClients.run(client -> { + EnvironmentContext envContext = new EnvironmentContext( + ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) + ); + LOG.debug("Updating the metadata location of the following table:"); + logTable(tbl); + LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP)); + ALTER_TABLE.invoke(client, database, tableName, tbl, envContext); + return null; + }); + } else { + metaClients.run(client -> { + client.createTable(tbl); + return null; + }); } - return metadataUpdatedSuccessfully; } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 4baec5641c..9805c44eab 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -27,7 +27,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -45,22 +47,30 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; + /** * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to * avoid code duplication between this class and Metacat Tables. @@ -68,6 +78,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); + private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000; + private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; @@ -87,6 +99,12 @@ private static class WaitingForLockException extends RuntimeException { } } + protected enum CommitStatus { + FAILURE, + SUCCESS, + UNKNOWN + } + private final HiveClientPool metaClients; private final String fullName; private final String database; @@ -154,12 +172,13 @@ protected void doRefresh() { refreshFromMetadataLocation(metadataLocation); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); - boolean threw = true; + CommitStatus commitStatus = CommitStatus.FAILURE; boolean updateHiveTable = false; Optional lockId = Optional.empty(); try { @@ -191,10 +210,36 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { baseMetadataLocation, metadataLocation, database, tableName); } - setParameters(newMetadataLocation, tbl, hiveEngineEnabled); + // get Iceberg props that have been removed + Set removedProps = Collections.emptySet(); + if (base != null) { + removedProps = base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + Map summary = Optional.ofNullable(metadata.currentSnapshot()) + .map(Snapshot::summary) + .orElseGet(ImmutableMap::of); + setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary); + + try { + persistTable(tbl, updateHiveTable); + commitStatus = CommitStatus.SUCCESS; + } catch (Throwable persistFailure) { + LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, tableName, persistFailure); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw persistFailure; + case UNKNOWN: + throw new CommitStateUnknownException(persistFailure); + } + } - persistTable(tbl, updateHiveTable); - threw = false; } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); @@ -212,11 +257,56 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new RuntimeException("Interrupted during commit", e); } finally { - cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId); } } - private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { + /** + * Attempt to load the table and see if any current or past metadata location matches the one we were attempting + * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has + * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second + * committer was able to successfully commit on top of our commit. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { + int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS, + COMMIT_NUM_STATUS_CHECKS_DEFAULT); + + AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(maxAttempts) + .suppressFailureWhenFinished() + .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0) + .onFailure((location, checkException) -> + LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException)) + .run(location -> { + TableMetadata metadata = refresh(); + String currentMetadataLocation = metadata.metadataFileLocation(); + boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) || + metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation)); + if (commitSuccess) { + LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation); + status.set(CommitStatus.SUCCESS); + } else { + LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation); + status.set(CommitStatus.FAILURE); + } + }); + + if (status.get() == CommitStatus.UNKNOWN) { + LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " + + "Treating commit state as unknown.", + database, tableName, maxAttempts); + } + return status.get(); + } + + @VisibleForTesting + void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { if (updateHiveTable) { metaClients.run(client -> { EnvironmentContext envContext = new EnvironmentContext( @@ -262,13 +352,21 @@ private Table newHmsTable() { return newTable; } - void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnabled) { + void setHmsTableParameters(String newMetadataLocation, Table tbl, Map icebergTableProps, + Set obsoleteProps, boolean hiveEngineEnabled, + Map summary) { Map parameters = tbl.getParameters(); if (parameters == null) { parameters = new HashMap<>(); } + // push all Iceberg table properties into HMS + icebergTableProps.forEach(parameters::put); + + // remove any props from HMS that are no longer present in Iceberg table props + obsoleteProps.forEach(parameters::remove); + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); @@ -284,6 +382,17 @@ void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnab parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); } + // Set the basic statistics + if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { + parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) { + parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) { + parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } + tbl.setParameters(parameters); } @@ -306,6 +415,7 @@ StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEn return storageDescriptor; } + @VisibleForTesting long acquireLock() throws UnknownHostException, TException, InterruptedException { final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); lockComponent.setTablename(tableName); @@ -320,8 +430,8 @@ long acquireLock() throws UnknownHostException, TException, InterruptedException long duration = 0; boolean timeout = false; - if (state.get().equals(LockState.WAITING)) { - try { + try { + if (state.get().equals(LockState.WAITING)) { // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact, the maximum number of // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set @@ -349,9 +459,13 @@ long acquireLock() throws UnknownHostException, TException, InterruptedException LOG.warn("Interrupted while waiting for lock.", e); } }, TException.class); - } catch (WaitingForLockException waitingForLockException) { - timeout = true; - duration = System.currentTimeMillis() - start; + } + } catch (WaitingForLockException waitingForLockException) { + timeout = true; + duration = System.currentTimeMillis() - start; + } finally { + if (!state.get().equals(LockState.ACQUIRED)) { + unlock(Optional.of(lockId)); } } @@ -368,10 +482,10 @@ long acquireLock() throws UnknownHostException, TException, InterruptedException return lockId; } - private void cleanupMetadataAndUnlock(boolean errorThrown, String metadataLocation, Optional lockId) { + void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional lockId) { try { - if (errorThrown) { - // if anything went wrong, clean up the uncommitted metadata file + if (commitStatus == CommitStatus.FAILURE) { + // If we are sure the commit failed, clean up the uncommitted metadata file io().deleteFile(metadataLocation); } } catch (RuntimeException e) { @@ -392,8 +506,8 @@ void unlock(Optional lockId) { } } - // visible for testing - protected void doUnlock(long lockId) throws TException, InterruptedException { + @VisibleForTesting + void doUnlock(long lockId) throws TException, InterruptedException { metaClients.run(client -> { client.unlock(lockId); return null; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java index 81ac5043ae..bb1b0332e1 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java @@ -19,17 +19,26 @@ package org.apache.iceberg.hive; +import java.io.File; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.types.Types; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class TestHiveCommits extends HiveTableBaseTest { @@ -66,4 +75,248 @@ public void testSuppressUnlockExceptions() throws TException, InterruptedExcepti // the commit must succeed Assert.assertEquals(1, ops.current().schema().columns().size()); } + + /** + * Pretends we throw an error while persisting that actually fails to commit serverside + */ + @Test + public void testThriftExceptionFailureOnCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + + AssertHelpers.assertThrows("We should rethrow generic runtime errors if the " + + "commit actually doesn't succeed", RuntimeException.class, "Metastore operation failed", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2)); + Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an error while persisting that actually does commit serverside + */ + @Test + public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + // Simulate a communication error after a successful commit + commitAndThrowException(ops, spyOps); + + // Shouldn't throw because the commit actually succeeds even though persistTable throws an exception + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Commit should have been successful and new metadata file should be made", + 3, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to find out, + * but in reality the commit failed + */ + @Test + public void testThriftExceptionUnknownFailedCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Client could not determine outcome so new metadata file should also exist", + 3, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to find out, + * but in reality the commit succeeded + */ + @Test + public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + commitAndThrowException(ops, spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertFalse("Current metadata should have changed", ops.current().equals(metadataV2)); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + } + + /** + * Pretends we threw an exception while persisting, the commit succeeded, the lock expired, + * and a second committer placed a commit on top of ours before the first committer was able to check + * if their commit succeeded or not + * + * Timeline: + * Client 1 commits which throws an exception but suceeded + * Client 1's lock expires while waiting to do the recheck for commit success + * Client 2 acquires a lock, commits successfully on top of client 1's commit and release lock + * Client 1 check's to see if their commit was successful + * + * This tests to make sure a disconnected client 1 doesn't think their commit failed just because it isn't the + * current one during the recheck phase. + */ + @Test + public void testThriftExceptionConcurrentCommit() throws TException, InterruptedException, UnknownHostException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + AtomicLong lockId = new AtomicLong(); + doAnswer(i -> { + lockId.set(ops.acquireLock()); + return lockId.get(); + }).when(spyOps).acquireLock(); + + concurrentCommitAndThrowException(ops, spyOps, table, lockId); + + /* + This commit and our concurrent commit should succeed even though this commit throws an exception + after the persist operation succeeds + */ + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("The column addition from the concurrent commit should have been successful", + 2, ops.current().schema().columns().size()); + } + + private void commitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer(i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class); + realOperations.persistTable(tbl, true); + throw new TException("Datacenter on fire"); + }).when(spyOperations).persistTable(any(), anyBoolean()); + } + + private void concurrentCommitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations, + Table table, AtomicLong lockId) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer(i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class); + realOperations.persistTable(tbl, true); + // Simulate lock expiration or removal + realOperations.doUnlock(lockId.get()); + table.refresh(); + table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit(); + throw new TException("Datacenter on fire"); + }).when(spyOperations).persistTable(any(), anyBoolean()); + } + + private void failCommitAndThrowException(HiveTableOperations spyOperations) throws TException, InterruptedException { + doThrow(new TException("Datacenter on fire")) + .when(spyOperations) + .persistTable(any(), anyBoolean()); + } + + private void breakFallbackCatalogCommitCheck(HiveTableOperations spyOperations) { + when(spyOperations.refresh()) + .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check + } + + private boolean metadataFileExists(TableMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")).exists(); + } + + private int metadataFileCount(TableMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")).getParentFile() + .listFiles(file -> file.getName().endsWith("metadata.json")).length; + } } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index f9e6e243c5..ed22db32cb 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -38,9 +38,11 @@ import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.RetryingHMSHandler; import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.hadoop.Util; +import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -154,10 +156,6 @@ public HiveConf hiveConf() { return hiveConf; } - public HiveClientPool clientPool() { - return clientPool; - } - public String getDatabasePath(String dbName) { File dbDir = new File(hiveLocalDir, dbName + ".db"); return dbDir.getPath(); @@ -191,6 +189,10 @@ public void reset() throws Exception { } } + public Table getTable(String dbName, String tableName) throws TException, InterruptedException { + return clientPool.run(client -> client.getTable(dbName, tableName)); + } + private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception { HiveConf serverConf = new HiveConf(conf); serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true"); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 586f6fc942..54c2247fc0 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; @@ -51,10 +50,15 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); private static final Set PARAMETERS_TO_REMOVE = ImmutableSet - .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME); private static final Set PROPERTIES_TO_REMOVE = ImmutableSet - .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", - "bucketing_version"); + // We don't want to push down the metadata location props to Iceberg from HMS, + // since the snapshot pointer in HMS would always be one step ahead + .of(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + // Initially we'd like to cache the partition spec in HMS, but not push it down later to Iceberg during alter + // table commands since by then the HMS info can be stale + Iceberg does not store its partition spec in the props + InputFormatConfig.PARTITION_SPEC); private final Configuration conf; private Table icebergTable = null; @@ -179,10 +183,10 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, /** * Calculates the properties we would like to send to the catalog. *

    - *
  • The base of the properties is the properties store at the Hive Metastore for the given table + *
  • The base of the properties is the properties stored at the Hive Metastore for the given table *
  • We add the {@link Catalogs#LOCATION} as the table location *
  • We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name - *
  • We remove the Hive Metastore specific parameters + *
  • We remove some parameters that we don't want to push down to the Iceberg table props *
* @param hmsTable Table for which we are calculating the properties * @return The properties we can provide for Iceberg functions, like {@link Catalogs} @@ -200,7 +204,7 @@ private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore. properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString()); } - // Remove creation related properties + // Remove HMS table parameters we don't want to propagate to Iceberg PROPERTIES_TO_REMOVE.forEach(properties::remove); return properties; @@ -224,11 +228,11 @@ private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.ap private static PartitionSpec spec(Schema schema, Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) { - if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) { + if (hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) != null) { Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(), "Provide only one of the following: Hive partition specification, or the " + InputFormatConfig.PARTITION_SPEC + " property"); - return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC)); + return PartitionSpecParser.fromJson(schema, hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC)); } else if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { // If the table is partitioned then generate the identity partition definitions for the Iceberg table return HiveSchemaUtil.spec(schema, hmsTable.getPartitionKeys()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 78655dcc6a..0b6db1fe27 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -27,22 +27,27 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -100,9 +105,7 @@ public class TestHiveIcebergStorageHandlerNoScan { )) ); - private static final Set IGNORED_PARAMS = - ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, - StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); + private static final Set IGNORED_PARAMS = ImmutableSet.of("bucketing_version", "numFilesErasureCoded"); @Parameters(name = "catalog={0}") public static Collection parameters() { @@ -165,39 +168,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc icebergTable.schema().asStruct()); Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec()); - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // This is only set for HiveCatalog based tables. Check the value, then remove it so the other checks can be general - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertTrue(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) - .startsWith(icebergTable.location())); - hmsParams.remove(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); - } - - // General metadata checks - Assert.assertEquals(6, hmsParams.size()); - Assert.assertEquals("test", hmsParams.get("dummy")); - Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); - - // verify that storage descriptor is filled out with inputformat/outputformat/serde - Assert.assertEquals(HiveIcebergInputFormat.class.getName(), hmsTable.getSd().getInputFormat()); - Assert.assertEquals(HiveIcebergOutputFormat.class.getName(), hmsTable.getSd().getOutputFormat()); - Assert.assertEquals(HiveIcebergSerDe.class.getName(), hmsTable.getSd().getSerdeInfo().getSerializationLib()); - if (!Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(Collections.singletonMap("dummy", "test"), icebergTable.properties()); - shell.executeStatement("DROP TABLE customers"); // Check if the table was really dropped even from the Catalog @@ -207,13 +178,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc } ); } else { - Map expectedIcebergProperties = new HashMap<>(2); - expectedIcebergProperties.put("dummy", "test"); - expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); - Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); - - // Check the HMS table parameters - hmsTable = shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); // Drop the table @@ -238,7 +203,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc } @Test - public void testCreateTableWithoutSpec() throws TException, InterruptedException { + public void testCreateTableWithoutSpec() { TableIdentifier identifier = TableIdentifier.of("default", "customers"); shell.executeStatement("CREATE EXTERNAL TABLE customers " + @@ -250,26 +215,10 @@ public void testCreateTableWithoutSpec() throws TException, InterruptedException // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } } @Test - public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { + public void testCreateTableWithUnpartitionedSpec() { TableIdentifier identifier = TableIdentifier.of("default", "customers"); // We need the location for HadoopTable based tests only @@ -284,21 +233,6 @@ public void testCreateTableWithUnpartitionedSpec() throws TException, Interrupte // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } } @Test @@ -319,8 +253,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted testTables.loadTable(identifier); } else { // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); // Drop the table @@ -377,13 +310,12 @@ public void testCreateTableError() { } @Test - public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { + public void testCreateTableAboveExistingTable() throws IOException { // Create the Iceberg table testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET, Collections.emptyList()); if (Catalogs.hiveCatalog(shell.getHiveConf())) { - // In HiveCatalog we just expect an exception since the table is already exists AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, "customers already exists", () -> { @@ -394,24 +326,10 @@ public void testCreateTableAboveExistingTable() throws TException, IOException, } ); } else { + // With other catalogs, table creation should succeed shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - Assert.assertEquals(4, hmsParams.size()); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); } } @@ -562,4 +480,118 @@ public void testCreateTableWithoutColumnComments() { "from deserializer"}, rows.get(i)); } } + + @Test + public void testIcebergAndHmsTableProperties() throws Exception { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' %s" + + "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s')", + testTables.locationForCreateTableSQL(identifier), // we need the location for HadoopTable based tests only + InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA), + InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC), + "custom_property", "initial_val")); + + + // Check the Iceberg table parameters + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + + Map expectedIcebergProperties = new HashMap<>(); + expectedIcebergProperties.put("custom_property", "initial_val"); + expectedIcebergProperties.put("EXTERNAL", "TRUE"); + expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName()); + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); + } + if (MetastoreUtil.hive3PresentOnClasspath()) { + expectedIcebergProperties.put("bucketing_version", "2"); + } + Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); + Map hmsParams = hmsTable.getParameters() + .entrySet().stream() + .filter(e -> !IGNORED_PARAMS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(9, hmsParams.size()); + Assert.assertEquals("initial_val", hmsParams.get("custom_property")); + Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertEquals("true", hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsParams.get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsParams.get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), + getCurrentSnapshotForHiveCatalogTable(icebergTable)); + Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP)); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + } else { + Assert.assertEquals(7, hmsParams.size()); + Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED)); + } + + // Check HMS inputformat/outputformat/serde + Assert.assertEquals(HiveIcebergInputFormat.class.getName(), hmsTable.getSd().getInputFormat()); + Assert.assertEquals(HiveIcebergOutputFormat.class.getName(), hmsTable.getSd().getOutputFormat()); + Assert.assertEquals(HiveIcebergSerDe.class.getName(), hmsTable.getSd().getSerdeInfo().getSerializationLib()); + + // Add two new properties to the Iceberg table and update an existing one + icebergTable.updateProperties() + .set("new_prop_1", "true") + .set("new_prop_2", "false") + .set("custom_property", "new_val") + .commit(); + + // Refresh the HMS table to see if new Iceberg properties got synced into HMS + hmsParams = shell.metastore().getTable("default", "customers").getParameters() + .entrySet().stream() + .filter(e -> !IGNORED_PARAMS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(12, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop + Assert.assertEquals("true", hmsParams.get("new_prop_1")); + Assert.assertEquals("false", hmsParams.get("new_prop_2")); + Assert.assertEquals("new_val", hmsParams.get("custom_property")); + String prevSnapshot = getCurrentSnapshotForHiveCatalogTable(icebergTable); + icebergTable.refresh(); + String newSnapshot = getCurrentSnapshotForHiveCatalogTable(icebergTable); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP), prevSnapshot); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), newSnapshot); + } else { + Assert.assertEquals(7, hmsParams.size()); + } + + // Remove some Iceberg props and see if they're removed from HMS table props as well + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + icebergTable.updateProperties() + .remove("custom_property") + .remove("new_prop_1") + .commit(); + hmsParams = shell.metastore().getTable("default", "customers").getParameters(); + Assert.assertFalse(hmsParams.containsKey("custom_property")); + Assert.assertFalse(hmsParams.containsKey("new_prop_1")); + Assert.assertTrue(hmsParams.containsKey("new_prop_2")); + } + + // append some data and check whether HMS stats are aligned with snapshot summary + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + List records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS; + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, FileFormat.PARQUET, null, records); + hmsParams = shell.metastore().getTable("default", "customers").getParameters(); + Map summary = icebergTable.currentSnapshot().summary(); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP), hmsParams.get(StatsSetupConst.NUM_FILES)); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP), hmsParams.get(StatsSetupConst.ROW_COUNT)); + Assert.assertEquals(summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP), hmsParams.get(StatsSetupConst.TOTAL_SIZE)); + } + } + + private String getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) { + return ((BaseMetastoreTableOperations) ((BaseTable) icebergTable).operations()).currentMetadataLocation(); + } }