Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -92,6 +93,7 @@ public void tearDown() throws Exception {

private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withEngineType(EngineType.FLINK)
.withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning)
.bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking)
.bloomIndexKeysPerBucket(2).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,79 +30,47 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.TestFlinkHoodieBloomIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* The test harness for resource initialization and cleanup.
*/
public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable {
public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness {

protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
private String testMethodName;
protected transient Configuration hadoopConf = null;
protected transient FileSystem fs;
protected transient MiniClusterWithClientResource flinkCluster = null;
protected transient HoodieFlinkEngineContext context = null;
protected transient ExecutorService executorService;
protected transient HoodieFlinkWriteClient writeClient;
protected transient HoodieTableFileSystemView tableView;
protected Configuration hadoopConf;
protected FileSystem fs;
protected HoodieFlinkEngineContext context;
protected ExecutorService executorService;
protected HoodieFlinkWriteClient writeClient;
protected HoodieTableFileSystemView tableView;

protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);

// dfs
protected transient HdfsTestService hdfsTestService;
protected transient MiniDFSCluster dfsCluster;
protected transient DistributedFileSystem dfs;

@BeforeEach
public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) {
testMethodName = testInfo.getTestMethod().get().getName();
} else {
testMethodName = "Unknown";
}
}

protected void initFlinkMiniCluster() {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
}

protected void initFileSystem() {
hadoopConf = new Configuration();
initFileSystemWithConfiguration(hadoopConf);
context = new HoodieFlinkEngineContext(supplier);
}

private void initFileSystemWithConfiguration(Configuration configuration) {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
checkState(basePath != null);
fs = FSUtils.getFs(basePath, configuration);
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
Expand All @@ -124,9 +92,7 @@ protected void initMetaClient() throws IOException {
}

protected void initMetaClient(HoodieTableType tableType) throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
checkState(basePath != null);
metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
}

Expand Down Expand Up @@ -156,18 +122,10 @@ public void cleanupResources() throws java.io.IOException {
cleanupFlinkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
cleanupDFS();
cleanupExecutorService();
System.gc();
}

protected void cleanupFlinkMiniCluster() {
if (flinkCluster != null) {
flinkCluster.after();
flinkCluster = null;
}
}

/**
* Simple test sink function.
*/
Expand All @@ -185,7 +143,7 @@ public synchronized void invoke(HoodieRecord value, Context context) throws Exce
/**
* Cleanups hoodie clients.
*/
protected void cleanupClients() throws java.io.IOException {
protected void cleanupClients() {
if (metaClient != null) {
metaClient = null;
}
Expand All @@ -199,24 +157,6 @@ protected void cleanupClients() throws java.io.IOException {
}
}

/**
* Cleanups the distributed file system.
*
* @throws IOException
*/
protected void cleanupDFS() throws java.io.IOException {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown(true, true);
hdfsTestService = null;
dfsCluster = null;
dfs = null;
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}

/**
* Cleanups the executor service.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private HoodieFlinkWriteableTestTable(String basePath, org.apache.hadoop.fs.File
}

public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
return new HoodieFlinkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
return new HoodieFlinkWriteableTestTable(metaClient.getBasePathV2().toString(), metaClient.getRawFs(), metaClient, schema, filter);
}

public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,17 @@

package org.apache.hudi.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

Expand All @@ -43,93 +38,75 @@
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestFileBasedLockProvider {
private static HdfsTestService hdfsTestService;
private static MiniDFSCluster dfsCluster;
private static LockConfiguration lockConfiguration;
private static Configuration hadoopConf;

@BeforeAll
public static void setup() throws IOException {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
hadoopConf = dfsCluster.getFileSystem().getConf();
@TempDir
Path tempDir;
String basePath;
LockConfiguration lockConfiguration;
Configuration hadoopConf;

@BeforeEach
public void setUp() throws IOException {
basePath = tempDir.toUri().getPath();
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
lockConfiguration = new LockConfiguration(properties);
}

@AfterAll
public static void cleanUpAfterAll() throws IOException {
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
fs.delete(new Path("/tmp"), true);
if (hdfsTestService != null) {
hdfsTestService.stop();
hdfsTestService = null;
}
}

@AfterEach
public void cleanUpAfterEach() throws IOException {
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
fs.delete(new Path("/tmp/lock"), true);
hadoopConf = new Configuration();
}

@Test
public void testAcquireLock() {
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
fileBasedLockProvider.unlock();
}

@Test
public void testAcquireLockWithDefaultPath() {
lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/");
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), basePath);
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
fileBasedLockProvider.unlock();
lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
}

@Test
public void testUnLock() {
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
fileBasedLockProvider.unlock();
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
}

@Test
public void testReentrantLock() {
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
fileBasedLockProvider.unlock();
}

@Test
public void testUnlockWithoutLock() {
try {
assertDoesNotThrow(() -> {
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
fileBasedLockProvider.unlock();
} catch (HoodieLockException e) {
Assertions.fail();
}
});
}

}
Loading