Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.hive;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
Expand All @@ -28,7 +30,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
Expand Down Expand Up @@ -82,9 +86,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms";
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table")
.impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext",
String.class, String.class, Table.class, EnvironmentContext.class)
Expand All @@ -96,6 +102,15 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
GC_ENABLED, "external.table.purge"
);

private static Cache<String, ReentrantLock> commitLockCache;

private static synchronized void initTableLevelLockCache(long evictionTimeout) {
if (commitLockCache == null) {
commitLockCache = Caffeine.newBuilder()
.expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does expireAfterAccess work?
I am worried about this sequence:

  • C1 commit comes, gets the JVM lock, and HMS lock
  • eviction timeout
  • C2 commit comes, gets a new JVM lock, and tries to lock
  • C1 finishes and unlocks the old JVM lock

Copy link
Collaborator Author

@marton-bod marton-bod Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expireAfterAccess is based on the amount of time that elapsed after the last read/write operation on the cache entry. So the scenario that you mentioned could only happen if the C1 commit takes more than the entire eviction timeout period (10 minutes by default) which is very unlikely. Even if it does happen (e.g. due to some extreme lock starvation), and C2 starts the commit operation, it shouldn't have too much of an impact because the HMS locking mechanism will still enforce that there won't be write conflicts between threads (i.e. we would be basically back to where we are in the status quo, without the table-level locks).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like how the worst case scenario in the lock's not working is that we just fall back to the old behavior.

.build();
}
}

/**
* Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some
Expand Down Expand Up @@ -144,6 +159,9 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
long tableLevelLockCacheEvictionTimeout =
conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
}

@Override
Expand Down Expand Up @@ -191,6 +209,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
// getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same
// JVM process, which would result in unnecessary and costly HMS lock acquisition requests
ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems safe even though one application can interact with multiple metastores as full name includes the catalog name.

tableLevelMutex.lock();
try {
lockId = Optional.of(acquireLock());
// TODO add lock heart beating for cases where default lock timeout is too low.
Expand Down Expand Up @@ -267,6 +289,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
tableLevelMutex.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
package org.apache.iceberg.hive;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.iceberg.AssertHelpers;
Expand All @@ -42,7 +47,11 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestHiveCommitLocks extends HiveTableBaseTest {
Expand Down Expand Up @@ -210,4 +219,31 @@ public void testPassThroughInterruptions() throws TException {
"Could not acquire the lock on",
() -> spyOps.doCommit(metadataV2, metadataV1));
}

@Test
public void testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws Exception {
int numConcurrentCommits = 10;
// resetting the spy client to forget about prior call history
reset(spyClient);

// simulate several concurrent commit operations on the same table
ExecutorService executor = Executors.newFixedThreadPool(numConcurrentCommits);
IntStream.range(0, numConcurrentCommits).forEach(i ->
executor.submit(() -> {
try {
spyOps.doCommit(metadataV2, metadataV1);
} catch (CommitFailedException e) {
// failures are expected here when checking the base version
// it's no problem, we're not testing the actual commit success here, only the HMS lock acquisition attempts
}
}));
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);

// intra-process commits to the same table should be serialized now
// i.e. no thread should receive WAITING state from HMS and have to call checkLock periodically
verify(spyClient, never()).checkLock(any(Long.class));
// all threads eventually got their turn
verify(spyClient, times(numConcurrentCommits)).lock(any(LockRequest.class));
}
}