Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client.transaction.lock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* InProcess level lock. This {@link LockProvider} implementation is to
* guard table from concurrent operations happening in the local JVM process.
* <p>
* Note: This Lock provider implementation doesn't allow lock reentrancy.
* Attempting to reacquire the lock from the same thread will throw
* HoodieLockException. Threads other than the current lock owner, will
* block on lock() and return false on tryLock().
*/
public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock> {

private static final Logger LOG = LogManager.getLogger(ZookeeperBasedLockProvider.class);
private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
private final long maxWaitTimeMillis;

public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
TypedProperties typedProperties = lockConfiguration.getConfig();
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
}

@Override
public void lock() {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
LOCK.writeLock().lock();
LOG.info(getLogMessage(LockState.ACQUIRED));
}

@Override
public boolean tryLock() {
return tryLock(maxWaitTimeMillis, TimeUnit.MILLISECONDS);
}

@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}

boolean isLockAcquired;
try {
isLockAcquired = LOCK.writeLock().tryLock(time, unit);
} catch (InterruptedException e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
}

LOG.info(getLogMessage(isLockAcquired ? LockState.ACQUIRED : LockState.FAILED_TO_ACQUIRE));
return isLockAcquired;
}

@Override
public void unlock() {
LOG.info(getLogMessage(LockState.RELEASING));
try {
LOCK.writeLock().unlock();
} catch (Exception e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
}
LOG.info(getLogMessage(LockState.RELEASED));
}

@Override
public ReentrantReadWriteLock getLock() {
return LOCK;
}

@Override
public void close() {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
}
}

private String getLogMessage(LockState state) {
return StringUtils.join(String.valueOf(Thread.currentThread().getId()),
state.name(), " local process lock.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestInProcessLockProvider {

private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
private final Configuration hadoopConfiguration = new Configuration();
private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());

@Test
public void testLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testLockReAcquisitionBySameThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testLockReAcquisitionByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Main test thread
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});

// Another writer thread in parallel, should block
// and later acquire the lock once it is released
Thread writer2 = new Thread(new Runnable() {
@Override
public void run() {
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
writer2Completed.set(true);
}
});
writer2.start();

assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});

try {
writer2.join();
} catch (InterruptedException e) {
//
}
Assertions.assertTrue(writer2Completed.get());
}

@Test
public void testTryLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockAcquisitionWithTimeout() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockReAcquisitionBySameThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
});
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockReAcquisitionByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Main test thread
Assertions.assertTrue(inProcessLockProvider.tryLock());

// Another writer thread
Thread writer2 = new Thread(() -> {
Assertions.assertFalse(inProcessLockProvider.tryLock(100L, TimeUnit.MILLISECONDS));
writer2Completed.set(true);
});
writer2.start();
try {
writer2.join();
} catch (InterruptedException e) {
//
}

Assertions.assertTrue(writer2Completed.get());
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final int threadCount = 3;
final long awaitMaxTimeoutMs = 2000L;
final CountDownLatch latch = new CountDownLatch(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Let writer1 get the lock first, then wait for others
// to join the sync up point.
Thread writer1 = new Thread(() -> {
Assertions.assertTrue(inProcessLockProvider.tryLock());
latch.countDown();
try {
latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS);
// Following sleep is to make sure writer2 attempts
// to try lock and to get bocked on the lock which
// this thread is currently holding.
Thread.sleep(50);
} catch (InterruptedException e) {
//
}
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
writer1Completed.set(true);
});
writer1.start();

// Writer2 will block on trying to acquire the lock
// and will eventually get the lock before the timeout.
Thread writer2 = new Thread(() -> {
latch.countDown();
Assertions.assertTrue(inProcessLockProvider.tryLock(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS));
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
writer2Completed.set(true);
});
writer2.start();

// Let writer1 and writer2 wait at the sync up
// point to make sure they run in parallel and
// one get blocked by the other.
latch.countDown();
try {
writer1.join();
writer2.join();
} catch (InterruptedException e) {
//
}

// Make sure both writers actually completed good
Assertions.assertTrue(writer1Completed.get());
Assertions.assertTrue(writer2Completed.get());
}

@Test
public void testLockReleaseByClose() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider.close();
});
}

@Test
public void testRedundantUnlock() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.unlock();
});
}

@Test
public void testUnlockWithoutLock() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.unlock();
});
}
}