diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 7c45882853d5..647c6260a4e7 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -19,6 +19,8 @@ package org.apache.iceberg.hive; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; @@ -28,7 +30,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -82,9 +86,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; + private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms"; private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds + private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10); private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table") .impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext", String.class, String.class, Table.class, EnvironmentContext.class) @@ -96,6 +102,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { GC_ENABLED, "external.table.purge" ); + private static Cache commitLockCache; + + private static synchronized void initTableLevelLockCache(long evictionTimeout) { + if (commitLockCache == null) { + commitLockCache = Caffeine.newBuilder() + .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS) + .build(); + } + } /** * Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some @@ -144,6 +159,9 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT); this.lockCheckMaxWaitTime = conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); + long tableLevelLockCacheEvictionTimeout = + conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT); + initTableLevelLockCache(tableLevelLockCacheEvictionTimeout); } @Override @@ -191,6 +209,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { CommitStatus commitStatus = CommitStatus.FAILURE; boolean updateHiveTable = false; Optional lockId = Optional.empty(); + // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same + // JVM process, which would result in unnecessary and costly HMS lock acquisition requests + ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true)); + tableLevelMutex.lock(); try { lockId = Optional.of(acquireLock()); // TODO add lock heart beating for cases where default lock timeout is too low. @@ -267,6 +289,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } finally { cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId); + tableLevelMutex.unlock(); } } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index 531e511ac54a..ada7fa6a2e31 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -20,9 +20,14 @@ package org.apache.iceberg.hive; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.iceberg.AssertHelpers; @@ -42,7 +47,11 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestHiveCommitLocks extends HiveTableBaseTest { @@ -210,4 +219,31 @@ public void testPassThroughInterruptions() throws TException { "Could not acquire the lock on", () -> spyOps.doCommit(metadataV2, metadataV1)); } + + @Test + public void testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws Exception { + int numConcurrentCommits = 10; + // resetting the spy client to forget about prior call history + reset(spyClient); + + // simulate several concurrent commit operations on the same table + ExecutorService executor = Executors.newFixedThreadPool(numConcurrentCommits); + IntStream.range(0, numConcurrentCommits).forEach(i -> + executor.submit(() -> { + try { + spyOps.doCommit(metadataV2, metadataV1); + } catch (CommitFailedException e) { + // failures are expected here when checking the base version + // it's no problem, we're not testing the actual commit success here, only the HMS lock acquisition attempts + } + })); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + // intra-process commits to the same table should be serialized now + // i.e. no thread should receive WAITING state from HMS and have to call checkLock periodically + verify(spyClient, never()).checkLock(any(Long.class)); + // all threads eventually got their turn + verify(spyClient, times(numConcurrentCommits)).lock(any(LockRequest.class)); + } }