Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 344 additions & 0 deletions java/src/test/java/org/lance/NamespaceIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@
import org.lance.namespace.LanceNamespaceStorageOptionsProvider;
import org.lance.namespace.model.CreateEmptyTableRequest;
import org.lance.namespace.model.CreateEmptyTableResponse;
import org.lance.namespace.model.CreateTableRequest;
import org.lance.namespace.model.CreateTableResponse;
import org.lance.namespace.model.DeclareTableRequest;
import org.lance.namespace.model.DeclareTableResponse;
import org.lance.namespace.model.DescribeTableRequest;
import org.lance.namespace.model.DescribeTableResponse;
import org.lance.namespace.model.DropTableRequest;
import org.lance.namespace.model.DropTableResponse;
import org.lance.namespace.model.TableExistsRequest;
import org.lance.operation.Append;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
Expand All @@ -47,6 +56,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -55,9 +65,16 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Integration tests for Lance with S3 and credential refresh using StorageOptionsProvider.
Expand All @@ -79,6 +96,21 @@ public class NamespaceIntegrationTest {
private static final String BUCKET_NAME = "lance-namespace-integtest-java";

private static S3Client s3Client;
private BufferAllocator testAllocator;
private String testPrefix;

@BeforeEach
void setUpTest() {
testAllocator = new RootAllocator(Long.MAX_VALUE);
testPrefix = "test-" + UUID.randomUUID().toString().substring(0, 8);
}

@AfterEach
void tearDownTest() {
if (testAllocator != null) {
testAllocator.close();
}
}

@BeforeAll
static void setup() {
Expand Down Expand Up @@ -1438,4 +1470,316 @@ void testTransactionCommitWithNamespace() throws Exception {
}
}
}

private Map<String, String> createDirectoryNamespaceS3Config() {
Map<String, String> config = new HashMap<>();
config.put("root", "s3://" + BUCKET_NAME + "/" + testPrefix);
config.put("storage.access_key_id", ACCESS_KEY);
config.put("storage.secret_access_key", SECRET_KEY);
config.put("storage.endpoint", ENDPOINT_URL);
config.put("storage.region", REGION);
config.put("storage.allow_http", "true");
config.put("storage.virtual_hosted_style_request", "false");
config.put("inline_optimization_enabled", "false");
// Very high retry count to guarantee all concurrent operations succeed
config.put("commit_retries", "2147483647");
return config;
}

private byte[] createTestTableData() throws Exception {
Schema schema =
new Schema(
Arrays.asList(
new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null),
new Field("name", FieldType.nullable(new ArrowType.Utf8()), null),
new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null)));

try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, testAllocator)) {
IntVector idVector = (IntVector) root.getVector("id");
VarCharVector nameVector = (VarCharVector) root.getVector("name");
IntVector ageVector = (IntVector) root.getVector("age");

idVector.allocateNew(3);
nameVector.allocateNew(3);
ageVector.allocateNew(3);

idVector.set(0, 1);
nameVector.set(0, "Alice".getBytes());
ageVector.set(0, 30);

idVector.set(1, 2);
nameVector.set(1, "Bob".getBytes());
ageVector.set(1, 25);

idVector.set(2, 3);
nameVector.set(2, "Charlie".getBytes());
ageVector.set(2, 35);

idVector.setValueCount(3);
nameVector.setValueCount(3);
ageVector.setValueCount(3);
root.setRowCount(3);

ByteArrayOutputStream out = new ByteArrayOutputStream();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
writer.writeBatch();
}
return out.toByteArray();
}
}

@Test
void testBasicCreateAndDropOnS3() throws Exception {
DirectoryNamespace namespace = new DirectoryNamespace();
namespace.initialize(createDirectoryNamespaceS3Config(), testAllocator);

try {
String tableName = "basic_test_table";
List<String> tableId = Arrays.asList("test_ns", tableName);
byte[] tableData = createTestTableData();

CreateTableRequest createReq = new CreateTableRequest().id(tableId);
CreateTableResponse createResp = namespace.createTable(createReq, tableData);
assertNotNull(createResp);
assertNotNull(createResp.getLocation());

DropTableRequest dropReq = new DropTableRequest().id(tableId);
DropTableResponse dropResp = namespace.dropTable(dropReq);
assertNotNull(dropResp);

TableExistsRequest existsReq = new TableExistsRequest().id(tableId);
assertThrows(RuntimeException.class, () -> namespace.tableExists(existsReq));
} finally {
namespace.close();
}
}

@Test
void testConcurrentCreateAndDropWithSingleInstanceOnS3() throws Exception {
DirectoryNamespace namespace = new DirectoryNamespace();
namespace.initialize(createDirectoryNamespaceS3Config(), testAllocator);

try {
int numTables = 10;
ExecutorService executor = Executors.newFixedThreadPool(numTables);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(numTables);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
executor.submit(
() -> {
try {
startLatch.await();

String tableName = "s3_concurrent_table_" + tableIndex;
List<String> tableId = Arrays.asList("test_ns", tableName);
byte[] tableData = createTestTableData();

CreateTableRequest createReq = new CreateTableRequest().id(tableId);
namespace.createTable(createReq, tableData);

DropTableRequest dropReq = new DropTableRequest().id(tableId);
namespace.dropTable(dropReq);

successCount.incrementAndGet();
} catch (Exception e) {
failCount.incrementAndGet();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
assertTrue(doneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for tasks to complete");

executor.shutdown();
assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));

assertEquals(numTables, successCount.get(), "All tasks should succeed");
assertEquals(0, failCount.get(), "No tasks should fail");
} finally {
namespace.close();
}
}

@Test
void testConcurrentCreateAndDropWithMultipleInstancesOnS3() throws Exception {
int numTables = 10;
ExecutorService executor = Executors.newFixedThreadPool(numTables);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(numTables);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
List<DirectoryNamespace> namespaces = new ArrayList<>();

Map<String, String> baseConfig = createDirectoryNamespaceS3Config();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
executor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
startLatch.await();

localNs = new DirectoryNamespace();
localNs.initialize(new HashMap<>(baseConfig), testAllocator);

synchronized (namespaces) {
namespaces.add(localNs);
}

String tableName = "s3_multi_ns_table_" + tableIndex;
List<String> tableId = Arrays.asList("test_ns", tableName);
byte[] tableData = createTestTableData();

CreateTableRequest createReq = new CreateTableRequest().id(tableId);
localNs.createTable(createReq, tableData);

DropTableRequest dropReq = new DropTableRequest().id(tableId);
localNs.dropTable(dropReq);

successCount.incrementAndGet();
} catch (Exception e) {
failCount.incrementAndGet();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
assertTrue(doneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for tasks to complete");

executor.shutdown();
assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));

for (DirectoryNamespace ns : namespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

assertEquals(numTables, successCount.get(), "All tasks should succeed");
assertEquals(0, failCount.get(), "No tasks should fail");
}

@Test
void testConcurrentCreateThenDropFromDifferentInstanceOnS3() throws Exception {
int numTables = 10;
Map<String, String> baseConfig = createDirectoryNamespaceS3Config();

// First, create all tables using separate namespace instances
ExecutorService createExecutor = Executors.newFixedThreadPool(numTables);
CountDownLatch createStartLatch = new CountDownLatch(1);
CountDownLatch createDoneLatch = new CountDownLatch(numTables);
AtomicInteger createSuccessCount = new AtomicInteger(0);
List<DirectoryNamespace> createNamespaces = new ArrayList<>();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
createExecutor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
createStartLatch.await();

localNs = new DirectoryNamespace();
localNs.initialize(new HashMap<>(baseConfig), testAllocator);

synchronized (createNamespaces) {
createNamespaces.add(localNs);
}

String tableName = "s3_cross_instance_table_" + tableIndex;
List<String> tableId = Arrays.asList("test_ns", tableName);
byte[] tableData = createTestTableData();

CreateTableRequest createReq = new CreateTableRequest().id(tableId);
localNs.createTable(createReq, tableData);

createSuccessCount.incrementAndGet();
} catch (Exception e) {
// Ignore
} finally {
createDoneLatch.countDown();
}
});
}

createStartLatch.countDown();
assertTrue(createDoneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for creates");
createExecutor.shutdown();

assertEquals(numTables, createSuccessCount.get(), "All creates should succeed");

// Close create namespaces
for (DirectoryNamespace ns : createNamespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

// Now drop all tables using NEW namespace instances
ExecutorService dropExecutor = Executors.newFixedThreadPool(numTables);
CountDownLatch dropStartLatch = new CountDownLatch(1);
CountDownLatch dropDoneLatch = new CountDownLatch(numTables);
AtomicInteger dropSuccessCount = new AtomicInteger(0);
AtomicInteger dropFailCount = new AtomicInteger(0);
List<DirectoryNamespace> dropNamespaces = new ArrayList<>();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
dropExecutor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
dropStartLatch.await();

localNs = new DirectoryNamespace();
localNs.initialize(new HashMap<>(baseConfig), testAllocator);

synchronized (dropNamespaces) {
dropNamespaces.add(localNs);
}

String tableName = "s3_cross_instance_table_" + tableIndex;
List<String> tableId = Arrays.asList("test_ns", tableName);

DropTableRequest dropReq = new DropTableRequest().id(tableId);
localNs.dropTable(dropReq);

dropSuccessCount.incrementAndGet();
} catch (Exception e) {
dropFailCount.incrementAndGet();
} finally {
dropDoneLatch.countDown();
}
});
}

dropStartLatch.countDown();
assertTrue(dropDoneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for drops");
dropExecutor.shutdown();

// Close drop namespaces
for (DirectoryNamespace ns : dropNamespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

assertEquals(numTables, dropSuccessCount.get(), "All drops should succeed");
assertEquals(0, dropFailCount.get(), "No drops should fail");
}
}
Loading
Loading