diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java index 48a4f860e8bb..f86890abbff1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java @@ -68,11 +68,13 @@ public static void beforeClass() { String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix; S3FileIO fileIO = new S3FileIO(clientFactory::s3); glueCatalog = new GlueCatalog(); - glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, fileIO); + glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, + LockManagers.defaultLockManager(), fileIO); AwsProperties properties = new AwsProperties(); properties.setGlueCatalogSkipArchive(true); glueCatalogWithSkip = new GlueCatalog(); - glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, fileIO); + glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, + LockManagers.defaultLockManager(), fileIO); } @AfterClass diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java index 06e18b8426c9..04ddabfa6939 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java @@ -36,7 +36,6 @@ import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -202,8 +201,7 @@ public void testACL() throws Exception { @Test public void testClientFactorySerialization() throws Exception { - S3FileIO fileIO = new S3FileIO(); - fileIO.initialize(Maps.newHashMap()); + S3FileIO fileIO = new S3FileIO(clientFactory::s3); write(fileIO); byte [] data = SerializationUtils.serialize(fileIO); S3FileIO fileIO2 = SerializationUtils.deserialize(data); diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 2dddda9c6878..a4d407d0b8f0 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -78,6 +78,7 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp private String warehousePath; private AwsProperties awsProperties; private FileIO fileIO; + private LockManager lockManager; /** * No-arg constructor to load the catalog dynamically. @@ -94,6 +95,7 @@ public void initialize(String name, Map properties) { properties.get(CatalogProperties.WAREHOUSE_LOCATION), new AwsProperties(properties), AwsClientFactories.from(properties).glue(), + LockManagers.from(properties), initializeFileIO(properties)); } @@ -109,11 +111,12 @@ private FileIO initializeFileIO(Map properties) { } @VisibleForTesting - void initialize(String name, String path, AwsProperties properties, GlueClient client, FileIO io) { + void initialize(String name, String path, AwsProperties properties, GlueClient client, LockManager lock, FileIO io) { this.catalogName = name; this.awsProperties = properties; this.warehousePath = cleanWarehousePath(path); this.glue = client; + this.lockManager = lock; this.fileIO = io; } @@ -130,7 +133,7 @@ private String cleanWarehousePath(String path) { @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier); + return new GlueTableOperations(glue, lockManager, catalogName, awsProperties, fileIO, tableIdentifier); } /** diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java index cff437b18606..832c45d4c9c7 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -57,16 +57,20 @@ class GlueTableOperations extends BaseMetastoreTableOperations { private final String databaseName; private final String tableName; private final String fullTableName; + private final String commitLockEntityId; private final FileIO fileIO; + private final LockManager lockManager; - GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties, + GlueTableOperations(GlueClient glue, LockManager lockManager, String catalogName, AwsProperties awsProperties, FileIO fileIO, TableIdentifier tableIdentifier) { this.glue = glue; this.awsProperties = awsProperties; this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier); this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier); this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName); + this.commitLockEntityId = String.format("%s.%s", databaseName, tableName); this.fileIO = fileIO; + this.lockManager = lockManager; } @Override @@ -100,10 +104,11 @@ protected void doRefresh() { protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); boolean exceptionThrown = true; - Table glueTable = getGlueTable(); - checkMetadataLocation(glueTable, base); - Map properties = prepareProperties(glueTable, newMetadataLocation); try { + lock(newMetadataLocation); + Table glueTable = getGlueTable(); + checkMetadataLocation(glueTable, base); + Map properties = prepareProperties(glueTable, newMetadataLocation); persistGlueTable(glueTable, properties); exceptionThrown = false; } catch (ConcurrentModificationException e) { @@ -114,9 +119,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } catch (SdkException e) { throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName()); } finally { - if (exceptionThrown) { - io().deleteFile(newMetadataLocation); - } + cleanupMetadataAndUnlock(exceptionThrown, newMetadataLocation); + } + } + + private void lock(String newMetadataLocation) { + if (!lockManager.acquire(commitLockEntityId, newMetadataLocation)) { + throw new IllegalStateException(String.format("Fail to acquire lock %s to commit new metadata at %s", + commitLockEntityId, newMetadataLocation)); } } @@ -180,4 +190,18 @@ private void persistGlueTable(Table glueTable, Map parameters) { .build()); } } + + private void cleanupMetadataAndUnlock(boolean exceptionThrown, String metadataLocation) { + try { + if (exceptionThrown) { + // if anything went wrong, clean up the uncommitted metadata file + io().deleteFile(metadataLocation); + } + } catch (RuntimeException e) { + LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e); + throw e; + } finally { + lockManager.release(commitLockEntityId, metadataLocation); + } + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java new file mode 100644 index 000000000000..16c7e25a5359 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; + +/** + * An interface for locking, used to ensure Glue catalog commit isolation. + */ +interface LockManager extends AutoCloseable { + + /** + * Try to acquire a lock + * @param entityId ID of the entity to lock + * @param ownerId ID of the owner if the lock + * @return if the lock for the entity is acquired by the owner + */ + boolean acquire(String entityId, String ownerId); + + /** + * Release a lock + * @param entityId ID of the entity to lock + * @param ownerId ID of the owner if the lock + * @return if the lock for the entity of the owner is released + */ + boolean release(String entityId, String ownerId); + + /** + * Initialize lock manager from catalog properties. + * @param properties catalog properties + */ + void initialize(Map properties); +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java new file mode 100644 index 000000000000..02ed234e1c00 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class LockManagers { + + private static final LockManager LOCK_MANAGER_DEFAULT = new InMemoryLockManager(Maps.newHashMap()); + + private LockManagers() { + } + + public static LockManager defaultLockManager() { + return LOCK_MANAGER_DEFAULT; + } + + public static LockManager from(Map properties) { + if (properties.containsKey(CatalogProperties.LOCK_IMPL)) { + return loadLockManager(properties.get(CatalogProperties.LOCK_IMPL), properties); + } else { + return defaultLockManager(); + } + } + + private static LockManager loadLockManager(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(LockManager.class).hiddenImpl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize LockManager, missing no-arg constructor: %s", impl), e); + } + + LockManager lockManager; + try { + lockManager = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize LockManager, %s does not implement LockManager.", impl), e); + } + + lockManager.initialize(properties); + return lockManager; + } + + abstract static class BaseLockManager implements LockManager { + + private static volatile ScheduledExecutorService scheduler; + + private long acquireTimeoutMs; + private long acquireIntervalMs; + private long heartbeatIntervalMs; + private long heartbeatTimeoutMs; + private int heartbeatThreads; + + public long heartbeatTimeoutMs() { + return heartbeatTimeoutMs; + } + + public long heartbeatIntervalMs() { + return heartbeatIntervalMs; + } + + public long acquireIntervalMs() { + return acquireIntervalMs; + } + + public long acquireTimeoutMs() { + return acquireTimeoutMs; + } + + public int heartbeatThreads() { + return heartbeatThreads; + } + + public ScheduledExecutorService scheduler() { + if (scheduler == null) { + synchronized (BaseLockManager.class) { + if (scheduler == null) { + scheduler = MoreExecutors.getExitingScheduledExecutorService( + (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( + heartbeatThreads(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("iceberg-lock-manager-%d") + .build())); + } + } + } + + return scheduler; + } + + @Override + public void initialize(Map properties) { + this.acquireTimeoutMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT); + this.acquireIntervalMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT); + this.heartbeatIntervalMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT); + this.heartbeatTimeoutMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS, CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT); + this.heartbeatThreads = PropertyUtil.propertyAsInt(properties, + CatalogProperties.LOCK_HEARTBEAT_THREADS, CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT); + } + } + + /** + * Implementation of {@link LockManager} that uses an in-memory concurrent map for locking. + * This implementation should only be used for testing, + * or if the caller only needs locking within the same JVM during table commits. + */ + static class InMemoryLockManager extends BaseLockManager { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryLockManager.class); + + private static final Map LOCKS = Maps.newConcurrentMap(); + private static final Map> HEARTBEATS = Maps.newHashMap(); + + InMemoryLockManager(Map properties) { + initialize(properties); + } + + @VisibleForTesting + void acquireOnce(String entityId, String ownerId) { + InMemoryLockContent content = LOCKS.get(entityId); + if (content != null && content.expireMs() > System.currentTimeMillis()) { + throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s", + entityId, content.ownerId(), content.expireMs())); + } + + long expiration = System.currentTimeMillis() + heartbeatTimeoutMs(); + boolean succeed; + if (content == null) { + InMemoryLockContent previous = LOCKS.putIfAbsent( + entityId, new InMemoryLockContent(ownerId, expiration)); + succeed = previous == null; + } else { + succeed = LOCKS.replace(entityId, content, new InMemoryLockContent(ownerId, expiration)); + } + + if (succeed) { + // cleanup old heartbeat + if (HEARTBEATS.containsKey(entityId)) { + HEARTBEATS.remove(entityId).cancel(false); + } + + HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> { + InMemoryLockContent lastContent = LOCKS.get(entityId); + try { + long newExpiration = System.currentTimeMillis() + heartbeatTimeoutMs(); + LOCKS.replace(entityId, lastContent, new InMemoryLockContent(ownerId, newExpiration)); + } catch (NullPointerException e) { + throw new RuntimeException("Cannot heartbeat to a deleted lock " + entityId, e); + } + + }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS)); + + } else { + throw new IllegalStateException("Unable to acquire lock " + entityId); + } + } + + @Override + public boolean acquire(String entityId, String ownerId) { + try { + Tasks.foreach(entityId) + .retry(Integer.MAX_VALUE - 1) + .onlyRetryOn(IllegalStateException.class) + .throwFailureWhenFinished() + .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .run(id -> acquireOnce(id, ownerId)); + return true; + } catch (IllegalStateException e) { + return false; + } + } + + @Override + public boolean release(String entityId, String ownerId) { + InMemoryLockContent currentContent = LOCKS.get(entityId); + if (currentContent == null) { + LOG.error("Cannot find lock for entity {}", entityId); + return false; + } + + if (!currentContent.ownerId().equals(ownerId)) { + LOG.error("Cannot unlock {} by {}, current owner: {}", entityId, ownerId, currentContent.ownerId()); + return false; + } + + HEARTBEATS.remove(entityId).cancel(false); + LOCKS.remove(entityId); + return true; + } + + @Override + public void close() { + HEARTBEATS.values().forEach(future -> future.cancel(false)); + HEARTBEATS.clear(); + LOCKS.clear(); + } + } + + private static class InMemoryLockContent { + private final String ownerId; + private final long expireMs; + + InMemoryLockContent(String ownerId, long expireMs) { + this.ownerId = ownerId; + this.expireMs = expireMs; + } + + public long expireMs() { + return expireMs; + } + + public String ownerId() { + return ownerId; + } + + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java index 45329c1c1e77..95dfad687f06 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java @@ -70,7 +70,8 @@ public class GlueCatalogTest { public void before() { glue = Mockito.mock(GlueClient.class); glueCatalog = new GlueCatalog(); - glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), glue, null); + glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), glue, + LockManagers.defaultLockManager(), null); } @Test @@ -80,14 +81,16 @@ public void constructor_emptyWarehousePath() { "Cannot initialize GlueCatalog because warehousePath must not be null", () -> { GlueCatalog catalog = new GlueCatalog(); - catalog.initialize(CATALOG_NAME, null, new AwsProperties(), glue, null); + catalog.initialize(CATALOG_NAME, null, new AwsProperties(), glue, + LockManagers.defaultLockManager(), null); }); } @Test public void constructor_warehousePathWithEndSlash() { GlueCatalog catalogWithSlash = new GlueCatalog(); - catalogWithSlash.initialize(CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), glue, null); + catalogWithSlash.initialize( + CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), glue, LockManagers.defaultLockManager(), null); Mockito.doReturn(GetDatabaseResponse.builder() .database(Database.builder().name("db").build()).build()) .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java new file mode 100644 index 000000000000..636dfdef1ca7 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class InMemoryLockManagerTest { + + private LockManagers.InMemoryLockManager lockManager; + private String lockEntityId; + private String ownerId; + + @Rule + public Timeout timeout = new Timeout(5, TimeUnit.SECONDS); + + @Before + public void before() { + lockEntityId = UUID.randomUUID().toString(); + ownerId = UUID.randomUUID().toString(); + lockManager = new LockManagers.InMemoryLockManager(Maps.newHashMap()); + } + + @After + public void after() { + lockManager.close(); + } + + @Test + public void testAcquireOnce_singleProcess() { + lockManager.acquireOnce(lockEntityId, ownerId); + AssertHelpers.assertThrows("should fail when acquire again", + IllegalStateException.class, + "currently held", + () -> lockManager.acquireOnce(lockEntityId, ownerId)); + } + + @Test + public void testAcquireOnce_multiProcess() { + List results = IntStream.range(0, 10).parallel() + .mapToObj(i -> { + try { + lockManager.acquireOnce(lockEntityId, ownerId); + return true; + } catch (IllegalStateException e) { + return false; + } + }) + .collect(Collectors.toList()); + Assert.assertEquals( + "only 1 thread should have acquired the lock", + 1, results.stream().filter(s -> s).count()); + } + + @Test + public void testReleaseAndAcquire() { + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + Assert.assertTrue(lockManager.release(lockEntityId, ownerId)); + Assert.assertTrue("acquire after release should succeed", lockManager.acquire(lockEntityId, ownerId)); + } + + @Test + public void testReleaseWithWrongOwner() { + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + Assert.assertFalse("should return false if ownerId is wrong", + lockManager.release(lockEntityId, UUID.randomUUID().toString())); + } + + @Test + public void testAcquire_singleProcess() throws Exception { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500", + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "2000" + )); + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + String oldOwner = ownerId; + + CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertTrue(lockManager.release(lockEntityId, oldOwner)); + return null; + }); + + ownerId = UUID.randomUUID().toString(); + long start = System.currentTimeMillis(); + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + Assert.assertTrue("should succeed after 200ms", + System.currentTimeMillis() - start >= 200); + } + + @Test + public void testAcquire_multiProcess_allSucceed() { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500" + )); + long start = System.currentTimeMillis(); + List results = IntStream.range(0, 3).parallel() + .mapToObj(i -> { + String owner = UUID.randomUUID().toString(); + boolean succeeded = lockManager.acquire(lockEntityId, owner); + if (succeeded) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertTrue(lockManager.release(lockEntityId, owner)); + } + return succeeded; + }) + .collect(Collectors.toList()); + Assert.assertEquals("all lock acquire should succeed sequentially", + 3, results.stream().filter(s -> s).count()); + Assert.assertTrue("must take more than 3 seconds", System.currentTimeMillis() - start >= 3000); + } + + @Test + public void testAcquire_multiProcess_onlyOneSucceed() { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100", + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500", + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "2000" + )); + + List results = IntStream.range(0, 3).parallel() + .mapToObj(i -> lockManager.acquire(lockEntityId, ownerId)) + .collect(Collectors.toList()); + Assert.assertEquals("only 1 thread should have acquired the lock", + 1, results.stream().filter(s -> s).count()); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java new file mode 100644 index 000000000000..b5bf054d2122 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +public class LockManagersTest { + + @Test + public void testLoadDefaultLockManager() { + Assert.assertTrue(LockManagers.defaultLockManager() instanceof LockManagers.InMemoryLockManager); + } + + @Test + public void testLoadCustomLockManager() { + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.LOCK_IMPL, CustomLockManager.class.getName()); + Assert.assertTrue(LockManagers.from(properties) instanceof CustomLockManager); + } + + static class CustomLockManager implements LockManager { + + @Override + public boolean acquire(String entityId, String ownerId) { + return false; + } + + @Override + public boolean release(String entityId, String ownerId) { + return false; + } + + @Override + public void close() throws Exception { + + } + + @Override + public void initialize(Map properties) { + + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index e27a507fef8b..5992810b261d 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -19,6 +19,8 @@ package org.apache.iceberg; +import java.util.concurrent.TimeUnit; + public class CatalogProperties { private CatalogProperties() { @@ -31,4 +33,24 @@ private CatalogProperties() { public static final String HIVE_URI = "uri"; public static final String HIVE_CLIENT_POOL_SIZE = "clients"; public static final int HIVE_CLIENT_POOL_SIZE_DEFAULT = 2; + + public static final String LOCK_IMPL = "lock.impl"; + + public static final String LOCK_HEARTBEAT_INTERVAL_MS = "lock.heartbeat-interval-ms"; + public static final long LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(3); + + public static final String LOCK_HEARTBEAT_TIMEOUT_MS = "lock.heartbeat-timeout-ms"; + public static final long LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15); + + public static final String LOCK_HEARTBEAT_THREADS = "lock.heartbeat-threads"; + public static final int LOCK_HEARTBEAT_THREADS_DEFAULT = 4; + + public static final String LOCK_ACQUIRE_INTERVAL_MS = "lock.acquire-interval-ms"; + public static final long LOCK_ACQUIRE_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(5); + + public static final String LOCK_ACQUIRE_TIMEOUT_MS = "lock.acquire-timeout-ms"; + public static final long LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(3); + + public static final String LOCK_TABLE = "lock.table"; + }