Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public final class ScmConfigKeys {
"ozone.chunk.read.mapped.buffer.threshold";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT =
"32KB";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY =
"ozone.chunk.read.mapped.buffer.max.count";
// this max_count could not be greater than Linux platform max_map_count which by default is 65530.
public static final int OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT = 0;

public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY =
"ozone.scm.container.layout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ static ChunkBuffer wrap(ByteBuffer buffer) {
return new ChunkBufferImplWithByteBuffer(buffer);
}

/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer},
* with a function called when buffers are released.*/
static ChunkBuffer wrap(List<ByteBuffer> buffers) {
Objects.requireNonNull(buffers, "buffers == null");
if (buffers.size() == 1) {
Expand Down
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,15 @@
The default read threshold to use memory mapped buffers.
</description>
</property>
<property>
<name>ozone.chunk.read.mapped.buffer.max.count</name>
<value>0</value>
<tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
<description>
The default max count of memory mapped buffers allowed for a DN.
Default 0 means no mapped buffers allowed for data read.
</description>
</property>
<property>
<name>ozone.scm.container.layout</name>
<value>FILE_PER_BLOCK</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.ToLongFunction;

Expand All @@ -50,6 +51,7 @@
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.impl.MappedBufferManager;
import org.apache.hadoop.util.Time;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -200,11 +202,12 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data,
}
}

@SuppressWarnings("checkstyle:parameternumber")
public static ChunkBuffer readData(long len, int bufferCapacity,
File file, long off, HddsVolume volume, int readMappedBufferThreshold)
throws StorageContainerException {
if (len > readMappedBufferThreshold) {
return readData(file, bufferCapacity, off, len, volume);
File file, long off, HddsVolume volume, int readMappedBufferThreshold, boolean mmapEnabled,
MappedBufferManager mappedBufferManager) throws StorageContainerException {
if (mmapEnabled && len > readMappedBufferThreshold && bufferCapacity > readMappedBufferThreshold) {
return readData(file, bufferCapacity, off, len, volume, mappedBufferManager);
} else if (len == 0) {
return ChunkBuffer.wrap(Collections.emptyList());
}
Expand Down Expand Up @@ -256,25 +259,52 @@ private static void readData(File file, long offset, long len,
* @return a list of {@link MappedByteBuffer} containing the data.
*/
private static ChunkBuffer readData(File file, int chunkSize,
long offset, long length, HddsVolume volume)
long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager)
throws StorageContainerException {

final List<ByteBuffer> buffers = new ArrayList<>(
Math.toIntExact((length - 1) / chunkSize) + 1);
readData(file, offset, length, channel -> {
long readLen = 0;
while (readLen < length) {
final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
final ByteBuffer mapped = channel.map(
FileChannel.MapMode.READ_ONLY, offset + readLen, n);
LOG.debug("mapped: offset={}, readLen={}, n={}, {}",
offset, readLen, n, mapped.getClass());
readLen += mapped.remaining();
buffers.add(mapped);
final int bufferNum = Math.toIntExact((length - 1) / chunkSize) + 1;
if (!mappedBufferManager.getQuota(bufferNum)) {
// proceed with normal buffer
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length,
chunkSize);
readData(file, offset, length, c -> c.position(offset).read(buffers), volume);
Arrays.stream(buffers).forEach(ByteBuffer::flip);
return ChunkBuffer.wrap(Arrays.asList(buffers));
} else {
try {
// proceed with mapped buffer
final List<ByteBuffer> buffers = new ArrayList<>(bufferNum);
readData(file, offset, length, channel -> {
long readLen = 0;
while (readLen < length) {
final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
final long finalOffset = offset + readLen;
final AtomicReference<IOException> exception = new AtomicReference<>();
ByteBuffer mapped = mappedBufferManager.computeIfAbsent(file.getAbsolutePath(), finalOffset, n,
() -> {
try {
return channel.map(FileChannel.MapMode.READ_ONLY, finalOffset, n);
} catch (IOException e) {
LOG.error("Failed to map file {} with offset {} and length {}", file, finalOffset, n);
exception.set(e);
return null;
}
});
if (mapped == null) {
throw exception.get();
}
LOG.debug("mapped: offset={}, readLen={}, n={}, {}", finalOffset, readLen, n, mapped.getClass());
readLen += mapped.remaining();
buffers.add(mapped);
}
return readLen;
}, volume);
return ChunkBuffer.wrap(buffers);
} catch (Throwable e) {
mappedBufferManager.releaseQuota(bufferNum);
throw e;
}
return readLen;
}, volume);
return ChunkBuffer.wrap(buffers);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class BlockManagerImpl implements BlockManager {
// Default Read Buffer capacity when Checksum is not present
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;

/**
* Constructs a Block Manager.
Expand All @@ -79,6 +80,9 @@ public BlockManagerImpl(ConfigurationSource conf) {
this.readMappedBufferThreshold = config.getBufferSize(
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY,
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT);
this.readMappedBufferMaxCount = config.getInt(
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY,
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT);
}

@Override
Expand Down Expand Up @@ -304,6 +308,11 @@ public int getReadMappedBufferThreshold() {
return readMappedBufferThreshold;
}

/** @return the max count of memory mapped buffers for read. */
public int getReadMappedBufferMaxCount() {
return readMappedBufferMaxCount;
}

/**
* Deletes an existing block.
* As Deletion is handled by BlockDeletingService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class FilePerBlockStrategy implements ChunkManager {
private final OpenFiles files = new OpenFiles();
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;
private final MappedBufferManager mappedBufferManager;
private final VolumeSet volumeSet;

public FilePerBlockStrategy(boolean sync, BlockManager manager,
Expand All @@ -84,7 +86,15 @@ public FilePerBlockStrategy(boolean sync, BlockManager manager,
manager.getDefaultReadBufferCapacity();
this.readMappedBufferThreshold = manager == null ? 0
: manager.getReadMappedBufferThreshold();
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount);
} else {
mappedBufferManager = null;
}
}

private static void checkLayoutVersion(Container container) {
Expand Down Expand Up @@ -192,10 +202,10 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,

final long len = info.getLen();
long offset = info.getOffset();
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);
return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume,
readMappedBufferThreshold);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class FilePerChunkStrategy implements ChunkManager {
private final BlockManager blockManager;
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;
private final MappedBufferManager mappedBufferManager;
private final VolumeSet volumeSet;

public FilePerChunkStrategy(boolean sync, BlockManager manager,
Expand All @@ -77,7 +79,15 @@ public FilePerChunkStrategy(boolean sync, BlockManager manager,
manager.getDefaultReadBufferCapacity();
this.readMappedBufferThreshold = manager == null ? 0
: manager.getReadMappedBufferThreshold();
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount);
} else {
mappedBufferManager = null;
}
}

private static void checkLayoutVersion(Container container) {
Expand Down Expand Up @@ -265,7 +275,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
long offset = info.getOffset() - chunkFileOffset;
Preconditions.checkState(offset >= 0);
return ChunkUtils.readData(len, bufferCapacity, file, offset, volume,
readMappedBufferThreshold);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager);
}
} catch (StorageContainerException ex) {
//UNABLE TO FIND chunk is not a problem as we will try with the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.keyvalue.impl;

import com.google.common.util.concurrent.Striped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

/**
* A Manager who manages the mapped buffers to under a predefined total count, also support reuse mapped buffers.
*/
public class MappedBufferManager {

private static ConcurrentHashMap<String, WeakReference<ByteBuffer>> mappedBuffers =
new ConcurrentHashMap<String, WeakReference<ByteBuffer>>();
private static final Logger LOG = LoggerFactory.getLogger(MappedBufferManager.class);
private final Semaphore semaphore;
private final int capacity;
private final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
private final Striped<Lock> lock;

public MappedBufferManager(int capacity) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
this.lock = Striped.lazyWeakLock(1024);
}

public boolean getQuota(int permits) {
boolean ret = semaphore.tryAcquire(permits);
if (ret) {
if (LOG.isDebugEnabled()) {
LOG.debug("quota is decreased by {} to total {}", permits, semaphore.availablePermits());
}
} else {
if (cleanupInProgress.compareAndSet(false, true)) {
CompletableFuture.runAsync(() -> {
int p = 0;
try {
for (String key : mappedBuffers.keySet()) {
ByteBuffer buf = mappedBuffers.get(key).get();
if (buf == null) {
mappedBuffers.remove(key);
p++;
}
}
if (p > 0) {
releaseQuota(p);
}
} finally {
cleanupInProgress.set(false);
}
});
}
}
return ret;
}

public void releaseQuota(int permits) {
semaphore.release(permits);
if (LOG.isDebugEnabled()) {
LOG.debug("quota is increased by {} to total {}", permits, semaphore.availablePermits());
}
}

public int availableQuota() {
return semaphore.availablePermits();
}

public ByteBuffer computeIfAbsent(String file, long position, long size,
Supplier<ByteBuffer> supplier) {
String key = file + "-" + position + "-" + size;
Lock fileLock = lock.get(key);
fileLock.lock();
try {
WeakReference<ByteBuffer> refer = mappedBuffers.get(key);
if (refer != null && refer.get() != null) {
// reuse the mapped buffer
if (LOG.isDebugEnabled()) {
LOG.debug("find buffer for key {}", key);
}
releaseQuota(1);
return refer.get();
}

ByteBuffer buffer = supplier.get();
if (buffer != null) {
mappedBuffers.put(key, new WeakReference<>(buffer));
if (LOG.isDebugEnabled()) {
LOG.debug("add buffer for key {}", key);
}
}
return buffer;
} finally {
fileLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ void finalizeBlock(Container container, BlockID blockId)
/** @return the threshold to read using memory mapped buffers. */
int getReadMappedBufferThreshold();

/** @return the max count of memory mapped buffers to read. */
int getReadMappedBufferMaxCount();

/**
* Shutdown ContainerManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.keyvalue.impl.MappedBufferManager;
import org.apache.ozone.test.GenericTestUtils;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -71,6 +72,7 @@ class TestChunkUtils {
private static final int BUFFER_CAPACITY = 1 << 20;
private static final int MAPPED_BUFFER_THRESHOLD = 32 << 10;
private static final Random RANDOM = new Random();
private static final MappedBufferManager MAPPED_BUFFER_MANAGER = new MappedBufferManager(100);

@TempDir
private File tempDir;
Expand All @@ -79,7 +81,7 @@ static ChunkBuffer readData(File file, long off, long len)
throws StorageContainerException {
LOG.info("off={}, len={}", off, len);
return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null,
MAPPED_BUFFER_THRESHOLD);
MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER);
}

@Test
Expand Down
Loading