-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18740. S3A prefetch cache blocks should be accessed by RW locks #5675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
912403c
33d64a4
710441f
d2b9210
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -85,12 +87,18 @@ private static final class Entry { | |
| private final Path path; | ||
| private final int size; | ||
| private final long checksum; | ||
| private final ReentrantReadWriteLock lock; | ||
| private enum LockType { | ||
| READ, | ||
| WRITE | ||
| } | ||
|
|
||
| Entry(int blockNumber, Path path, int size, long checksum) { | ||
| this.blockNumber = blockNumber; | ||
| this.path = path; | ||
| this.size = size; | ||
| this.checksum = checksum; | ||
| this.lock = new ReentrantReadWriteLock(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -99,6 +107,54 @@ public String toString() { | |
| "([%03d] %s: size = %d, checksum = %d)", | ||
| blockNumber, path, size, checksum); | ||
| } | ||
|
|
||
| /** | ||
| * Take the read or write lock. | ||
| * | ||
| * @param lockType type of the lock. | ||
| */ | ||
| void takeLock(LockType lockType) { | ||
| if (LockType.READ == lockType) { | ||
| lock.readLock().lock(); | ||
| } else if (LockType.WRITE == lockType) { | ||
| lock.writeLock().lock(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Release the read or write lock. | ||
| * | ||
| * @param lockType type of the lock. | ||
| */ | ||
| void releaseLock(LockType lockType) { | ||
| if (LockType.READ == lockType) { | ||
| lock.readLock().unlock(); | ||
| } else if (LockType.WRITE == lockType) { | ||
| lock.writeLock().unlock(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Try to take the read or write lock within the given timeout. | ||
| * | ||
| * @param lockType type of the lock. | ||
| * @param timeout the time to wait for the given lock. | ||
| * @param unit the time unit of the timeout argument. | ||
| * @return true if the lock of the given lock type was acquired. | ||
| */ | ||
| boolean takeLock(LockType lockType, long timeout, TimeUnit unit) { | ||
| try { | ||
| if (LockType.READ == lockType) { | ||
| return lock.readLock().tryLock(timeout, unit); | ||
| } else if (LockType.WRITE == lockType) { | ||
| return lock.writeLock().tryLock(timeout, unit); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| LOG.warn("Thread interrupted while trying to acquire {} lock", lockType, e); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -148,11 +204,15 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException { | |
| checkNotNull(buffer, "buffer"); | ||
|
|
||
| Entry entry = getEntry(blockNumber); | ||
| buffer.clear(); | ||
| readFile(entry.path, buffer); | ||
| buffer.rewind(); | ||
|
|
||
| validateEntry(entry, buffer); | ||
| entry.takeLock(Entry.LockType.READ); | ||
| try { | ||
| buffer.clear(); | ||
| readFile(entry.path, buffer); | ||
| buffer.rewind(); | ||
| validateEntry(entry, buffer); | ||
| } finally { | ||
| entry.releaseLock(Entry.LockType.READ); | ||
| } | ||
| } | ||
|
|
||
| protected int readFile(Path path, ByteBuffer buffer) throws IOException { | ||
|
|
@@ -200,7 +260,12 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, | |
|
|
||
| if (blocks.containsKey(blockNumber)) { | ||
| Entry entry = blocks.get(blockNumber); | ||
| validateEntry(entry, buffer); | ||
| entry.takeLock(Entry.LockType.READ); | ||
| try { | ||
| validateEntry(entry, buffer); | ||
| } finally { | ||
| entry.releaseLock(Entry.LockType.READ); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -268,12 +333,20 @@ public void close() throws IOException { | |
| int numFilesDeleted = 0; | ||
|
|
||
| for (Entry entry : blocks.values()) { | ||
| boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, 5, TimeUnit.SECONDS); | ||
|
||
| if (!lockAcquired) { | ||
| LOG.error("Cache file {} deletion would not be attempted as write lock could not" | ||
| + " be acquired within 5 sec", entry.path); | ||
| continue; | ||
| } | ||
| try { | ||
| Files.deleteIfExists(entry.path); | ||
| prefetchingStatistics.blockRemovedFromFileCache(); | ||
| numFilesDeleted++; | ||
| } catch (IOException e) { | ||
| LOG.debug("Failed to delete cache file {}", entry.path, e); | ||
| } finally { | ||
| entry.releaseLock(Entry.LockType.WRITE); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this and the others be private? you don't want other classes playing with your lock code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, done