Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.glue;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;

public class DynamoLockManagerTest {

private static final ForkJoinPool POOL = new ForkJoinPool(16);

private static String lockTableName;
private static DynamoDbClient dynamo;

private DynamoLockManager lockManager;
private String entityId;
private String ownerId;

@BeforeClass
public static void beforeClass() {
lockTableName = genTableName();
dynamo = AwsClientFactories.defaultFactory().dynamo();
}

@Before
public void before() {
lockManager = new DynamoLockManager(dynamo, lockTableName);
entityId = UUID.randomUUID().toString();
ownerId = UUID.randomUUID().toString();
}

@AfterClass
public static void afterClass() {
dynamo.deleteTable(DeleteTableRequest.builder().tableName(lockTableName).build());
}

@Test
public void testTableCreation() {
Assert.assertTrue(lockManager.tableExists(lockTableName));
}

@Test
public void testAcquireOnce_singleProcess() {
lockManager.acquireOnce(entityId, ownerId);
Map<String, AttributeValue> key = Maps.newHashMap();
key.put("entityId", AttributeValue.builder().s(entityId).build());
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
.tableName(lockTableName)
.key(key)
.build());
Assert.assertTrue("should have item in dynamo after acquire", response.hasItem());
Assert.assertEquals(entityId, response.item().get("entityId").s());
Assert.assertEquals(ownerId, response.item().get("ownerId").s());
Assert.assertNotNull(response.item().get("version"));
Assert.assertNotNull(response.item().get("leaseDurationMs"));
}

@Test
public void testAcquireOnce_multiProcess() throws Exception {
List<Boolean> results = POOL.submit(() -> IntStream.range(0, 16).parallel()
.mapToObj(i -> {
try {
DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName);
threadLocalLockManager.acquireOnce(entityId, UUID.randomUUID().toString());
return true;
} catch (ConditionalCheckFailedException e) {
return false;
}
})
.collect(Collectors.toList())).get();
Assert.assertEquals("should have only 1 process succeeded in acquisition",
1, results.stream().filter(s -> s).count());
}

@Test
public void testReleaseAndAcquire() {
Assert.assertTrue(lockManager.acquire(entityId, ownerId));
Assert.assertTrue(lockManager.release(entityId, ownerId));
Assert.assertTrue(lockManager.acquire(entityId, ownerId));
}

@Test
public void testReleaseWithWrongOwner() {
Assert.assertTrue(lockManager.acquire(entityId, ownerId));
Assert.assertFalse(lockManager.release(entityId, UUID.randomUUID().toString()));
}

@Test
@SuppressWarnings({"DangerousCompletableFutureUsage", "FutureReturnValueIgnored"})
public void testAcquire_singleProcess() throws Exception {
Assert.assertTrue(lockManager.acquire(entityId, ownerId));
String oldOwner = ownerId;

CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(lockManager.release(entityId, oldOwner));
return null;
});

ownerId = UUID.randomUUID().toString();
long start = System.currentTimeMillis();
Assert.assertTrue(lockManager.acquire(entityId, ownerId));
Assert.assertTrue("should succeed after 5 seconds",
System.currentTimeMillis() - start >= 5000);
}


@Test
public void testAcquire_multiProcess_allSucceed() throws Exception {
lockManager.initialize(ImmutableMap.of(
CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500",
CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "100000000",
CatalogProperties.LOCK_TABLE, lockTableName
));
long start = System.currentTimeMillis();
List<Boolean> results = POOL.submit(() -> IntStream.range(0, 16).parallel()
.mapToObj(i -> {
DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName);
String owner = UUID.randomUUID().toString();
boolean succeeded = threadLocalLockManager.acquire(entityId, owner);
if (succeeded) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(threadLocalLockManager.release(entityId, owner));
}
return succeeded;
})
.collect(Collectors.toList())).get();
Assert.assertEquals("all lock acquire should succeed sequentially",
16, results.stream().filter(s -> s).count());
Assert.assertTrue("must take more than 16 seconds", System.currentTimeMillis() - start >= 16000);
}

@Test
public void testAcquire_multiProcess_onlyOneSucceed() throws Exception {
lockManager.initialize(ImmutableMap.of(
CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "10000",
CatalogProperties.LOCK_TABLE, lockTableName
));

List<Boolean> results = POOL.submit(() -> IntStream.range(0, 16).parallel()
.mapToObj(i -> {
DynamoLockManager threadLocalLockManager = new DynamoLockManager(dynamo, lockTableName);
return threadLocalLockManager.acquire(entityId, ownerId);
})
.collect(Collectors.toList())).get();
Assert.assertEquals("only 1 thread should have acquired the lock",
1, results.stream().filter(s -> s).count());
}

@Test
public void testTableCreationFailure() {
DynamoDbClient dynamo2 = Mockito.mock(DynamoDbClient.class);
Mockito.doThrow(ResourceNotFoundException.class).when(dynamo2)
.describeTable(Mockito.any(DescribeTableRequest.class));
AssertHelpers.assertThrows("should fail to initialize the lock manager",
IllegalStateException.class,
"Cannot find Dynamo table",
() -> new DynamoLockManager(dynamo2, lockTableName));
}

private static String genTableName() {
return UUID.randomUUID().toString().replace("-", "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.glue;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;

public class GlueCatalogLockTest extends GlueTestBase {

private static String lockTableName;
private static DynamoDbClient dynamo;

@BeforeClass
public static void beforeClass() {
GlueTestBase.beforeClass();
String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
lockTableName = getRandomName();
S3FileIO fileIO = new S3FileIO(clientFactory::s3);
glueCatalog = new GlueCatalog();
AwsProperties awsProperties = new AwsProperties();
dynamo = clientFactory.dynamo();
glueCatalog.initialize(catalogName, testBucketPath, awsProperties, glue,
new DynamoLockManager(dynamo, lockTableName), fileIO);
}

@AfterClass
public static void afterClass() {
GlueTestBase.afterClass();
dynamo.deleteTable(DeleteTableRequest.builder().tableName(lockTableName).build());
}

@Test
public void testParallelCommit_multiThreadSingleCommit() {
int nThreads = 20;
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
DataFile dataFile = DataFiles.builder(partitionSpec)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(1)
.withRecordCount(1)
.build();

List<AppendFiles> pendingCommits = IntStream.range(0, nThreads)
.mapToObj(i -> table.newAppend().appendFile(dataFile))
.collect(Collectors.toList());

ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads));

Tasks.range(nThreads)
.retry(10000)
.throwFailureWhenFinished()
.executeWith(executorService)
.run(i -> pendingCommits.get(i).commit());

table.refresh();
Assert.assertEquals("Commits should all succeed sequentially", nThreads, table.history().size());
Assert.assertEquals("Should have all manifests", nThreads, table.currentSnapshot().allManifests().size());
}

@Test
public void testParallelCommit_multiThreadMultiCommit() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
String fileName = UUID.randomUUID().toString();
DataFile file = DataFiles.builder(table.spec())
.withPath(FileFormat.PARQUET.addExtension(fileName))
.withRecordCount(2)
.withFileSizeInBytes(0)
.build();

ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
});

table.refresh();
Assert.assertEquals("Commits should all succeed sequentially", 20, table.history().size());
Assert.assertEquals("should have 20 manifests", 20, table.currentSnapshot().allManifests().size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -59,6 +60,11 @@ public KmsClient kms() {
return KmsClient.builder().applyMutation(this::configure).build();
}

@Override
public DynamoDbClient dynamo() {
return DynamoDbClient.builder().applyMutation(this::configure).build();
}

@Override
public void initialize(Map<String, String> properties) {
roleArn = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_ARN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.common.DynConstructors;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -88,6 +89,11 @@ public KmsClient kms() {
return KmsClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public DynamoDbClient dynamo() {
return DynamoDbClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public void initialize(Map<String, String> properties) {
}
Expand Down
Loading