diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/DynamoLockManagerTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/DynamoLockManagerTest.java new file mode 100644 index 000000000000..4ca988b78978 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/DynamoLockManagerTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.AwsClientFactories; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; + +public class DynamoLockManagerTest { + + private static final ForkJoinPool POOL = new ForkJoinPool(16); + + private static String lockTableName; + private static DynamoDbClient dynamo; + + private DynamoLockManager lockManager; + private String entityId; + private String ownerId; + + @BeforeClass + public static void beforeClass() { + lockTableName = genTableName(); + dynamo = AwsClientFactories.defaultFactory().dynamo(); + } + + @Before + public void before() { + lockManager = new DynamoLockManager(dynamo, lockTableName); + entityId = UUID.randomUUID().toString(); + ownerId = UUID.randomUUID().toString(); + } + + @AfterClass + public static void afterClass() { + dynamo.deleteTable(DeleteTableRequest.builder().tableName(lockTableName).build()); + } + + @Test + public void testTableCreation() { + Assert.assertTrue(lockManager.tableExists(lockTableName)); + } + + @Test + public void testAcquireOnce_singleProcess() { + lockManager.acquireOnce(entityId, ownerId); + Map key = Maps.newHashMap(); + key.put("entityId", AttributeValue.builder().s(entityId).build()); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(lockTableName) + .key(key) + .build()); + Assert.assertTrue("should have item in dynamo after acquire", response.hasItem()); + Assert.assertEquals(entityId, response.item().get("entityId").s()); + Assert.assertEquals(ownerId, response.item().get("ownerId").s()); + Assert.assertNotNull(response.item().get("version")); + Assert.assertNotNull(response.item().get("leaseDurationMs")); + } + + @Test + public void testAcquireOnce_multiProcess() throws Exception { + List results = POOL.submit(() -> IntStream.range(0, 16).parallel() + .mapToObj(i -> { + try { + DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName); + threadLocalLockManager.acquireOnce(entityId, UUID.randomUUID().toString()); + return true; + } catch (ConditionalCheckFailedException e) { + return false; + } + }) + .collect(Collectors.toList())).get(); + Assert.assertEquals("should have only 1 process succeeded in acquisition", + 1, results.stream().filter(s -> s).count()); + } + + @Test + public void testReleaseAndAcquire() { + Assert.assertTrue(lockManager.acquire(entityId, ownerId)); + Assert.assertTrue(lockManager.release(entityId, ownerId)); + Assert.assertTrue(lockManager.acquire(entityId, ownerId)); + } + + @Test + public void testReleaseWithWrongOwner() { + Assert.assertTrue(lockManager.acquire(entityId, ownerId)); + Assert.assertFalse(lockManager.release(entityId, UUID.randomUUID().toString())); + } + + @Test + @SuppressWarnings({"DangerousCompletableFutureUsage", "FutureReturnValueIgnored"}) + public void testAcquire_singleProcess() throws Exception { + Assert.assertTrue(lockManager.acquire(entityId, ownerId)); + String oldOwner = ownerId; + + CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertTrue(lockManager.release(entityId, oldOwner)); + return null; + }); + + ownerId = UUID.randomUUID().toString(); + long start = System.currentTimeMillis(); + Assert.assertTrue(lockManager.acquire(entityId, ownerId)); + Assert.assertTrue("should succeed after 5 seconds", + System.currentTimeMillis() - start >= 5000); + } + + + @Test + public void testAcquire_multiProcess_allSucceed() throws Exception { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500", + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "100000000", + CatalogProperties.LOCK_TABLE, lockTableName + )); + long start = System.currentTimeMillis(); + List results = POOL.submit(() -> IntStream.range(0, 16).parallel() + .mapToObj(i -> { + DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName); + String owner = UUID.randomUUID().toString(); + boolean succeeded = threadLocalLockManager.acquire(entityId, owner); + if (succeeded) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertTrue(threadLocalLockManager.release(entityId, owner)); + } + return succeeded; + }) + .collect(Collectors.toList())).get(); + Assert.assertEquals("all lock acquire should succeed sequentially", + 16, results.stream().filter(s -> s).count()); + Assert.assertTrue("must take more than 16 seconds", System.currentTimeMillis() - start >= 16000); + } + + @Test + public void testAcquire_multiProcess_onlyOneSucceed() throws Exception { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "10000", + CatalogProperties.LOCK_TABLE, lockTableName + )); + + List results = POOL.submit(() -> IntStream.range(0, 16).parallel() + .mapToObj(i -> { + DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName); + return threadLocalLockManager.acquire(entityId, ownerId); + }) + .collect(Collectors.toList())).get(); + Assert.assertEquals("only 1 thread should have acquired the lock", + 1, results.stream().filter(s -> s).count()); + } + + @Test + public void testTableCreationFailure() { + DynamoDbClient dynamo2 = Mockito.mock(DynamoDbClient.class); + Mockito.doThrow(ResourceNotFoundException.class).when(dynamo2) + .describeTable(Mockito.any(DescribeTableRequest.class)); + AssertHelpers.assertThrows("should fail to initialize the lock manager", + IllegalStateException.class, + "Cannot find Dynamo table", + () -> new DynamoLockManager(dynamo2, lockTableName)); + } + + private static String genTableName() { + return UUID.randomUUID().toString().replace("-", ""); + } +} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogLockTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogLockTest.java new file mode 100644 index 000000000000..ba065db7e2f1 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogLockTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.util.Tasks; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; + +public class GlueCatalogLockTest extends GlueTestBase { + + private static String lockTableName; + private static DynamoDbClient dynamo; + + @BeforeClass + public static void beforeClass() { + GlueTestBase.beforeClass(); + String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix; + lockTableName = getRandomName(); + S3FileIO fileIO = new S3FileIO(clientFactory::s3); + glueCatalog = new GlueCatalog(); + AwsProperties awsProperties = new AwsProperties(); + dynamo = clientFactory.dynamo(); + glueCatalog.initialize(catalogName, testBucketPath, awsProperties, glue, + new DynamoLockManager(dynamo, lockTableName), fileIO); + } + + @AfterClass + public static void afterClass() { + GlueTestBase.afterClass(); + dynamo.deleteTable(DeleteTableRequest.builder().tableName(lockTableName).build()); + } + + @Test + public void testParallelCommit_multiThreadSingleCommit() { + int nThreads = 20; + String namespace = createNamespace(); + String tableName = getRandomName(); + createTable(namespace, tableName); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + DataFile dataFile = DataFiles.builder(partitionSpec) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(1) + .withRecordCount(1) + .build(); + + List pendingCommits = IntStream.range(0, nThreads) + .mapToObj(i -> table.newAppend().appendFile(dataFile)) + .collect(Collectors.toList()); + + ExecutorService executorService = MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads)); + + Tasks.range(nThreads) + .retry(10000) + .throwFailureWhenFinished() + .executeWith(executorService) + .run(i -> pendingCommits.get(i).commit()); + + table.refresh(); + Assert.assertEquals("Commits should all succeed sequentially", nThreads, table.history().size()); + Assert.assertEquals("Should have all manifests", nThreads, table.currentSnapshot().allManifests().size()); + } + + @Test + public void testParallelCommit_multiThreadMultiCommit() { + String namespace = createNamespace(); + String tableName = getRandomName(); + createTable(namespace, tableName); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + String fileName = UUID.randomUUID().toString(); + DataFile file = DataFiles.builder(table.spec()) + .withPath(FileFormat.PARQUET.addExtension(fileName)) + .withRecordCount(2) + .withFileSizeInBytes(0) + .build(); + + ExecutorService executorService = MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(2)); + + AtomicInteger barrier = new AtomicInteger(0); + Tasks.range(2) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run(index -> { + for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { + while (barrier.get() < numCommittedFiles * 2) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + table.newFastAppend().appendFile(file).commit(); + barrier.incrementAndGet(); + } + }); + + table.refresh(); + Assert.assertEquals("Commits should all succeed sequentially", 20, table.history().size()); + Assert.assertEquals("should have 20 manifests", 20, table.currentSnapshot().allManifests().size()); + } + +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 0275c633bb4f..05c48aaa1008 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -59,6 +60,11 @@ public KmsClient kms() { return KmsClient.builder().applyMutation(this::configure).build(); } + @Override + public DynamoDbClient dynamo() { + return DynamoDbClient.builder().applyMutation(this::configure).build(); + } + @Override public void initialize(Map properties) { roleArn = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_ARN); diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 7549eeed860d..89bb003680cc 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -23,6 +23,7 @@ import org.apache.iceberg.common.DynConstructors; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -88,6 +89,11 @@ public KmsClient kms() { return KmsClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build(); } + @Override + public DynamoDbClient dynamo() { + return DynamoDbClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build(); + } + @Override public void initialize(Map properties) { } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java index 801559e81fa4..1ed5e6f5a9c0 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Map; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -49,6 +50,12 @@ public interface AwsClientFactory extends Serializable { */ KmsClient kms(); + /** + * Create a Amazon DynamoDB client + * @return dynamoDB client + */ + DynamoDbClient dynamo(); + /** * Initialize AWS client factory from catalog properties. * @param properties catalog properties diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java b/aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java new file mode 100644 index 000000000000..1212fd7d885c --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.AwsClientFactories; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException; + +/** + * DynamoDB implementation for the lock manager. + */ +class DynamoLockManager extends LockManagers.BaseLockManager { + + private static final Logger LOG = LoggerFactory.getLogger(DynamoLockManager.class); + + private static final String COL_LOCK_ENTITY_ID = "entityId"; + private static final String COL_LEASE_DURATION_MS = "leaseDurationMs"; + private static final String COL_VERSION = "version"; + private static final String COL_LOCK_OWNER_ID = "ownerId"; + + private static final String CONDITION_LOCK_ID_MATCH = String.format( + "%s = :eid AND %s = :oid", + COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID); + private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format( + "attribute_not_exists(%s)", + COL_LOCK_ENTITY_ID); + private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format( + "attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)", + COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION); + + private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5; + private static final int RELEASE_RETRY_ATTEMPTS_MAX = 5; + + private static final List LOCK_TABLE_SCHEMA = Lists.newArrayList( + KeySchemaElement.builder() + .attributeName(COL_LOCK_ENTITY_ID) + .keyType(KeyType.HASH) + .build()); + + private static final List LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList( + AttributeDefinition.builder() + .attributeName(COL_LOCK_ENTITY_ID) + .attributeType(ScalarAttributeType.S) + .build()); + + private final Map heartbeats = Maps.newHashMap(); + + private DynamoDbClient dynamo; + private String lockTableName; + + /** + * constructor for dynamic initialization, {@link #initialize(Map)} must be called later. + */ + DynamoLockManager() { + } + + /** + * constructor used for testing purpose + * @param dynamo dynamo client + * @param lockTableName lock table name + */ + DynamoLockManager(DynamoDbClient dynamo, String lockTableName) { + super.initialize(Maps.newHashMap()); + this.dynamo = dynamo; + this.lockTableName = lockTableName; + ensureLockTableExistsOrCreate(); + } + + private void ensureLockTableExistsOrCreate() { + + if (tableExists(lockTableName)) { + return; + } + + LOG.info("Dynamo lock table {} not found, trying to create", lockTableName); + dynamo.createTable(CreateTableRequest.builder() + .tableName(lockTableName) + .keySchema(lockTableSchema()) + .attributeDefinitions(lockTableColDefinitions()) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build()); + + Tasks.foreach(lockTableName) + .retry(LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX) + .throwFailureWhenFinished() + .onlyRetryOn(IllegalStateException.class) + .run(this::checkTableActive); + } + + @VisibleForTesting + boolean tableExists(String tableName) { + try { + dynamo.describeTable(DescribeTableRequest.builder() + .tableName(tableName) + .build()); + return true; + } catch (ResourceNotFoundException e) { + return false; + } + } + + private void checkTableActive(String tableName) { + try { + DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder() + .tableName(tableName) + .build()); + TableStatus currentStatus = response.table().tableStatus(); + if (!currentStatus.equals(TableStatus.ACTIVE)) { + throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s", + tableName, currentStatus)); + } + } catch (ResourceNotFoundException e) { + throw new IllegalStateException(String.format("Cannot find Dynamo table %s", tableName)); + } + } + + @Override + public void initialize(Map properties) { + super.initialize(properties); + this.dynamo = AwsClientFactories.from(properties).dynamo(); + this.lockTableName = properties.get(CatalogProperties.LOCK_TABLE); + Preconditions.checkNotNull(lockTableName, "DynamoDB lock table name must not be null"); + ensureLockTableExistsOrCreate(); + } + + @Override + public boolean acquire(String entityId, String ownerId) { + try { + Tasks.foreach(entityId) + .throwFailureWhenFinished() + .retry(Integer.MAX_VALUE - 1) + .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .onlyRetryOn( + ConditionalCheckFailedException.class, + ProvisionedThroughputExceededException.class, + TransactionConflictException.class, + RequestLimitExceededException.class, + InternalServerErrorException.class) + .run(id -> acquireOnce(id, ownerId)); + return true; + } catch (DynamoDbException e) { + return false; + } + } + + @VisibleForTesting + void acquireOnce(String entityId, String ownerId) { + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(lockTableName) + .key(toKey(entityId)) + .build()); + + if (!response.hasItem()) { + dynamo.putItem(PutItemRequest.builder() + .tableName(lockTableName) + .item(toNewItem(entityId, ownerId, heartbeatTimeoutMs())) + .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST) + .build()); + } else { + Map currentItem = response.item(); + + try { + Thread.sleep(Long.parseLong(currentItem.get(COL_LEASE_DURATION_MS).n())); + } catch (InterruptedException e) { + throw new IllegalStateException( + String.format("Fail to acquire lock %s by %s, interrupted during sleep", entityId, ownerId), e); + } + + dynamo.putItem(PutItemRequest.builder() + .tableName(lockTableName) + .item(toNewItem(entityId, ownerId, heartbeatTimeoutMs())) + .conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH) + .expressionAttributeValues(ImmutableMap.of( + ":eid", AttributeValue.builder().s(entityId).build(), + ":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build())) + .build()); + } + + startNewHeartbeat(entityId, ownerId); + } + + private void startNewHeartbeat(String entityId, String ownerId) { + if (heartbeats.containsKey(entityId)) { + heartbeats.remove(entityId).cancel(); + } + + DynamoHeartbeat heartbeat = new DynamoHeartbeat(dynamo, lockTableName, + heartbeatIntervalMs(), heartbeatTimeoutMs(), entityId, ownerId); + heartbeat.schedule(scheduler()); + heartbeats.put(entityId, heartbeat); + } + + @Override + public boolean release(String entityId, String ownerId) { + boolean succeeded = false; + DynamoHeartbeat heartbeat = heartbeats.get(entityId); + try { + Tasks.foreach(entityId) + .retry(RELEASE_RETRY_ATTEMPTS_MAX) + .throwFailureWhenFinished() + .onlyRetryOn( + ProvisionedThroughputExceededException.class, + TransactionConflictException.class, + RequestLimitExceededException.class, + InternalServerErrorException.class) + .run(id -> dynamo.deleteItem(DeleteItemRequest.builder() + .tableName(lockTableName) + .key(toKey(id)) + .conditionExpression(CONDITION_LOCK_ID_MATCH) + .expressionAttributeValues(toLockIdValues(id, ownerId)) + .build())); + succeeded = true; + } catch (ConditionalCheckFailedException e) { + LOG.error("Failed to release lock for entity: {}, owner: {}, lock entity does not exist or owner not match", + entityId, ownerId, e); + } catch (DynamoDbException e) { + LOG.error("Failed to release lock {} by for entity: {}, owner: {}, encountered unexpected DynamoDB exception", + entityId, ownerId, e); + } finally { + if (heartbeat != null && heartbeat.ownerId().equals(ownerId)) { + heartbeat.cancel(); + } + } + + return succeeded; + } + + private static Map toKey(String entityId) { + return ImmutableMap.of(COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build()); + } + + private static Map toNewItem(String entityId, String ownerId, long heartbeatTimeoutMs) { + return ImmutableMap.of( + COL_LOCK_ENTITY_ID, AttributeValue.builder().s(entityId).build(), + COL_LOCK_OWNER_ID, AttributeValue.builder().s(ownerId).build(), + COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build(), + COL_LEASE_DURATION_MS, AttributeValue.builder().n(Long.toString(heartbeatTimeoutMs)).build()); + } + + private static Map toLockIdValues(String entityId, String ownerId) { + return ImmutableMap.of( + ":eid", AttributeValue.builder().s(entityId).build(), + ":oid", AttributeValue.builder().s(ownerId).build()); + } + + @Override + public void close() { + dynamo.close(); + heartbeats.values().forEach(DynamoHeartbeat::cancel); + heartbeats.clear(); + } + + /** + * The lock table schema, for users who would like to create the table separately + * @return lock table schema + */ + public static List lockTableSchema() { + return LOCK_TABLE_SCHEMA; + } + + /** + * The lock table column definition, for users who whould like to create the table separately + * @return lock table column definition + */ + public static List lockTableColDefinitions() { + return LOCK_TABLE_COL_DEFINITIONS; + } + + private static class DynamoHeartbeat implements Runnable { + + private final DynamoDbClient dynamo; + private final String lockTableName; + private final long intervalMs; + private final long timeoutMs; + private final String entityId; + private final String ownerId; + private ScheduledFuture future; + + DynamoHeartbeat(DynamoDbClient dynamo, String lockTableName, long intervalMs, long timeoutMs, + String entityId, String ownerId) { + this.dynamo = dynamo; + this.lockTableName = lockTableName; + this.intervalMs = intervalMs; + this.timeoutMs = timeoutMs; + this.entityId = entityId; + this.ownerId = ownerId; + this.future = null; + } + + @Override + public void run() { + try { + dynamo.putItem(PutItemRequest.builder() + .tableName(lockTableName) + .item(toNewItem(entityId, ownerId, timeoutMs)) + .conditionExpression(CONDITION_LOCK_ID_MATCH) + .expressionAttributeValues(toLockIdValues(entityId, ownerId)) + .build()); + } catch (ConditionalCheckFailedException e) { + LOG.error("Fail to heartbeat for entity: {}, owner: {} due to conditional check failure, " + + "unsafe concurrent commits might be going on", entityId, ownerId, e); + } catch (RuntimeException e) { + LOG.error("Failed to heartbeat for entity: {}, owner: {}", entityId, ownerId, e); + } + } + + public String ownerId() { + return ownerId; + } + + public void schedule(ScheduledExecutorService scheduler) { + future = scheduler.scheduleAtFixedRate(this, 0, intervalMs, TimeUnit.MILLISECONDS); + } + + public void cancel() { + if (future != null) { + future.cancel(false); + } + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java index 16c7e25a5359..e0249c97c780 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java @@ -36,9 +36,12 @@ interface LockManager extends AutoCloseable { /** * Release a lock + * + * @apiNote exception must not be thrown for this method. + * * @param entityId ID of the entity to lock * @param ownerId ID of the owner if the lock - * @return if the lock for the entity of the owner is released + * @return if the owner held the lock and successfully released it. */ boolean release(String entityId, String ownerId); diff --git a/aws/src/test/java/org/apache/iceberg/aws/AwsClientFactoriesTest.java b/aws/src/test/java/org/apache/iceberg/aws/AwsClientFactoriesTest.java index 56abe1d5c130..7431928d2c80 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/AwsClientFactoriesTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/AwsClientFactoriesTest.java @@ -23,6 +23,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -67,6 +68,11 @@ public KmsClient kms() { return null; } + @Override + public DynamoDbClient dynamo() { + return null; + } + @Override public void initialize(Map properties) { diff --git a/build.gradle b/build.gradle index 62154aab60b4..8fe3b3215c0c 100644 --- a/build.gradle +++ b/build.gradle @@ -270,6 +270,7 @@ project(':iceberg-aws') { compileOnly 'software.amazon.awssdk:kms' compileOnly 'software.amazon.awssdk:glue' compileOnly 'software.amazon.awssdk:sts' + compileOnly 'software.amazon.awssdk:dynamodb' compileOnly("org.apache.hadoop:hadoop-common") { exclude group: 'org.apache.avro', module: 'avro'