diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 36d4dbd45a2..db789783c7c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -144,6 +144,10 @@ public final class ScmConfigKeys { "ozone.chunk.read.mapped.buffer.threshold"; public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT = "32KB"; + public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY = + "ozone.chunk.read.mapped.buffer.max.count"; + // this max_count could not be greater than Linux platform max_map_count which by default is 65530. + public static final int OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT = 0; public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY = "ozone.scm.container.layout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index 058934c2f27..d3a558ca430 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -60,7 +60,8 @@ static ChunkBuffer wrap(ByteBuffer buffer) { return new ChunkBufferImplWithByteBuffer(buffer); } - /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */ + /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}, + * with a function called when buffers are released.*/ static ChunkBuffer wrap(List buffers) { Objects.requireNonNull(buffers, "buffers == null"); if (buffers.size() == 1) { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index ad06f2f6e33..e8289d159fe 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -860,6 +860,15 @@ The default read threshold to use memory mapped buffers. + + ozone.chunk.read.mapped.buffer.max.count + 0 + OZONE, SCM, CONTAINER, PERFORMANCE + + The default max count of memory mapped buffers allowed for a DN. + Default 0 means no mapped buffers allowed for data read. + + ozone.scm.container.layout FILE_PER_BLOCK diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 0fac45571c7..dc048ac16aa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -39,6 +39,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.function.ToLongFunction; @@ -50,6 +51,7 @@ import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.keyvalue.impl.MappedBufferManager; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -200,11 +202,12 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data, } } + @SuppressWarnings("checkstyle:parameternumber") public static ChunkBuffer readData(long len, int bufferCapacity, - File file, long off, HddsVolume volume, int readMappedBufferThreshold) - throws StorageContainerException { - if (len > readMappedBufferThreshold) { - return readData(file, bufferCapacity, off, len, volume); + File file, long off, HddsVolume volume, int readMappedBufferThreshold, boolean mmapEnabled, + MappedBufferManager mappedBufferManager) throws StorageContainerException { + if (mmapEnabled && len > readMappedBufferThreshold && bufferCapacity > readMappedBufferThreshold) { + return readData(file, bufferCapacity, off, len, volume, mappedBufferManager); } else if (len == 0) { return ChunkBuffer.wrap(Collections.emptyList()); } @@ -256,25 +259,52 @@ private static void readData(File file, long offset, long len, * @return a list of {@link MappedByteBuffer} containing the data. */ private static ChunkBuffer readData(File file, int chunkSize, - long offset, long length, HddsVolume volume) + long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager) throws StorageContainerException { - final List buffers = new ArrayList<>( - Math.toIntExact((length - 1) / chunkSize) + 1); - readData(file, offset, length, channel -> { - long readLen = 0; - while (readLen < length) { - final int n = Math.toIntExact(Math.min(length - readLen, chunkSize)); - final ByteBuffer mapped = channel.map( - FileChannel.MapMode.READ_ONLY, offset + readLen, n); - LOG.debug("mapped: offset={}, readLen={}, n={}, {}", - offset, readLen, n, mapped.getClass()); - readLen += mapped.remaining(); - buffers.add(mapped); + final int bufferNum = Math.toIntExact((length - 1) / chunkSize) + 1; + if (!mappedBufferManager.getQuota(bufferNum)) { + // proceed with normal buffer + final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length, + chunkSize); + readData(file, offset, length, c -> c.position(offset).read(buffers), volume); + Arrays.stream(buffers).forEach(ByteBuffer::flip); + return ChunkBuffer.wrap(Arrays.asList(buffers)); + } else { + try { + // proceed with mapped buffer + final List buffers = new ArrayList<>(bufferNum); + readData(file, offset, length, channel -> { + long readLen = 0; + while (readLen < length) { + final int n = Math.toIntExact(Math.min(length - readLen, chunkSize)); + final long finalOffset = offset + readLen; + final AtomicReference exception = new AtomicReference<>(); + ByteBuffer mapped = mappedBufferManager.computeIfAbsent(file.getAbsolutePath(), finalOffset, n, + () -> { + try { + return channel.map(FileChannel.MapMode.READ_ONLY, finalOffset, n); + } catch (IOException e) { + LOG.error("Failed to map file {} with offset {} and length {}", file, finalOffset, n); + exception.set(e); + return null; + } + }); + if (mapped == null) { + throw exception.get(); + } + LOG.debug("mapped: offset={}, readLen={}, n={}, {}", finalOffset, readLen, n, mapped.getClass()); + readLen += mapped.remaining(); + buffers.add(mapped); + } + return readLen; + }, volume); + return ChunkBuffer.wrap(buffers); + } catch (Throwable e) { + mappedBufferManager.releaseQuota(bufferNum); + throw e; } - return readLen; - }, volume); - return ChunkBuffer.wrap(buffers); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index dd18636ec00..67a98944b1e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -64,6 +64,7 @@ public class BlockManagerImpl implements BlockManager { // Default Read Buffer capacity when Checksum is not present private final int defaultReadBufferCapacity; private final int readMappedBufferThreshold; + private final int readMappedBufferMaxCount; /** * Constructs a Block Manager. @@ -79,6 +80,9 @@ public BlockManagerImpl(ConfigurationSource conf) { this.readMappedBufferThreshold = config.getBufferSize( ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY, ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT); + this.readMappedBufferMaxCount = config.getInt( + ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY, + ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT); } @Override @@ -304,6 +308,11 @@ public int getReadMappedBufferThreshold() { return readMappedBufferThreshold; } + /** @return the max count of memory mapped buffers for read. */ + public int getReadMappedBufferMaxCount() { + return readMappedBufferMaxCount; + } + /** * Deletes an existing block. * As Deletion is handled by BlockDeletingService, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index a87b184ccec..4ca578d7717 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -75,6 +75,8 @@ public class FilePerBlockStrategy implements ChunkManager { private final OpenFiles files = new OpenFiles(); private final int defaultReadBufferCapacity; private final int readMappedBufferThreshold; + private final int readMappedBufferMaxCount; + private final MappedBufferManager mappedBufferManager; private final VolumeSet volumeSet; public FilePerBlockStrategy(boolean sync, BlockManager manager, @@ -84,7 +86,15 @@ public FilePerBlockStrategy(boolean sync, BlockManager manager, manager.getDefaultReadBufferCapacity(); this.readMappedBufferThreshold = manager == null ? 0 : manager.getReadMappedBufferThreshold(); + this.readMappedBufferMaxCount = manager == null ? 0 + : manager.getReadMappedBufferMaxCount(); + LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount); this.volumeSet = volSet; + if (this.readMappedBufferMaxCount > 0) { + mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount); + } else { + mappedBufferManager = null; + } } private static void checkLayoutVersion(Container container) { @@ -192,10 +202,10 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, final long len = info.getLen(); long offset = info.getOffset(); - int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info, + int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info, defaultReadBufferCapacity); return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume, - readMappedBufferThreshold); + readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java index a649f573bf0..6ac88cad7f5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java @@ -67,6 +67,8 @@ public class FilePerChunkStrategy implements ChunkManager { private final BlockManager blockManager; private final int defaultReadBufferCapacity; private final int readMappedBufferThreshold; + private final int readMappedBufferMaxCount; + private final MappedBufferManager mappedBufferManager; private final VolumeSet volumeSet; public FilePerChunkStrategy(boolean sync, BlockManager manager, @@ -77,7 +79,15 @@ public FilePerChunkStrategy(boolean sync, BlockManager manager, manager.getDefaultReadBufferCapacity(); this.readMappedBufferThreshold = manager == null ? 0 : manager.getReadMappedBufferThreshold(); + this.readMappedBufferMaxCount = manager == null ? 0 + : manager.getReadMappedBufferMaxCount(); + LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount); this.volumeSet = volSet; + if (this.readMappedBufferMaxCount > 0) { + mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount); + } else { + mappedBufferManager = null; + } } private static void checkLayoutVersion(Container container) { @@ -265,7 +275,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, long offset = info.getOffset() - chunkFileOffset; Preconditions.checkState(offset >= 0); return ChunkUtils.readData(len, bufferCapacity, file, offset, volume, - readMappedBufferThreshold); + readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager); } } catch (StorageContainerException ex) { //UNABLE TO FIND chunk is not a problem as we will try with the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/MappedBufferManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/MappedBufferManager.java new file mode 100644 index 00000000000..be2751925c7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/MappedBufferManager.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import com.google.common.util.concurrent.Striped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +/** + * A Manager who manages the mapped buffers to under a predefined total count, also support reuse mapped buffers. + */ +public class MappedBufferManager { + + private static ConcurrentHashMap> mappedBuffers = + new ConcurrentHashMap>(); + private static final Logger LOG = LoggerFactory.getLogger(MappedBufferManager.class); + private final Semaphore semaphore; + private final int capacity; + private final AtomicBoolean cleanupInProgress = new AtomicBoolean(false); + private final Striped lock; + + public MappedBufferManager(int capacity) { + this.capacity = capacity; + this.semaphore = new Semaphore(capacity); + this.lock = Striped.lazyWeakLock(1024); + } + + public boolean getQuota(int permits) { + boolean ret = semaphore.tryAcquire(permits); + if (ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("quota is decreased by {} to total {}", permits, semaphore.availablePermits()); + } + } else { + if (cleanupInProgress.compareAndSet(false, true)) { + CompletableFuture.runAsync(() -> { + int p = 0; + try { + for (String key : mappedBuffers.keySet()) { + ByteBuffer buf = mappedBuffers.get(key).get(); + if (buf == null) { + mappedBuffers.remove(key); + p++; + } + } + if (p > 0) { + releaseQuota(p); + } + } finally { + cleanupInProgress.set(false); + } + }); + } + } + return ret; + } + + public void releaseQuota(int permits) { + semaphore.release(permits); + if (LOG.isDebugEnabled()) { + LOG.debug("quota is increased by {} to total {}", permits, semaphore.availablePermits()); + } + } + + public int availableQuota() { + return semaphore.availablePermits(); + } + + public ByteBuffer computeIfAbsent(String file, long position, long size, + Supplier supplier) { + String key = file + "-" + position + "-" + size; + Lock fileLock = lock.get(key); + fileLock.lock(); + try { + WeakReference refer = mappedBuffers.get(key); + if (refer != null && refer.get() != null) { + // reuse the mapped buffer + if (LOG.isDebugEnabled()) { + LOG.debug("find buffer for key {}", key); + } + releaseQuota(1); + return refer.get(); + } + + ByteBuffer buffer = supplier.get(); + if (buffer != null) { + mappedBuffers.put(key, new WeakReference<>(buffer)); + if (LOG.isDebugEnabled()) { + LOG.debug("add buffer for key {}", key); + } + } + return buffer; + } finally { + fileLock.unlock(); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 6dd8590bdf3..256d357a31d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -99,6 +99,9 @@ void finalizeBlock(Container container, BlockID blockId) /** @return the threshold to read using memory mapped buffers. */ int getReadMappedBufferThreshold(); + /** @return the max count of memory mapped buffers to read. */ + int getReadMappedBufferMaxCount(); + /** * Shutdown ContainerManager. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java index 5cd264af998..de78d340002 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.keyvalue.impl.MappedBufferManager; import org.apache.ozone.test.GenericTestUtils; import org.apache.commons.io.FileUtils; @@ -71,6 +72,7 @@ class TestChunkUtils { private static final int BUFFER_CAPACITY = 1 << 20; private static final int MAPPED_BUFFER_THRESHOLD = 32 << 10; private static final Random RANDOM = new Random(); + private static final MappedBufferManager MAPPED_BUFFER_MANAGER = new MappedBufferManager(100); @TempDir private File tempDir; @@ -79,7 +81,7 @@ static ChunkBuffer readData(File file, long off, long len) throws StorageContainerException { LOG.info("off={}, len={}", off, len); return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null, - MAPPED_BUFFER_THRESHOLD); + MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestMappedBufferManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestMappedBufferManager.java new file mode 100644 index 00000000000..22406975986 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestMappedBufferManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test for MappedBufferManager. + */ +public class TestMappedBufferManager { + + @Test + public void testComputeIfAbsent() { + MappedBufferManager manager = new MappedBufferManager(100); + String file = "/CID-fd49f4a7-670d-43c5-a177-8ac03aafceb2/current/containerDir0/2/chunks/113750153625600065.block"; + long position = 0; + int size = 1024; + ByteBuffer buffer1 = ByteBuffer.allocate(size); + ByteBuffer buffer2 = ByteBuffer.allocate(size + 1); + ByteBuffer byteBuffer1 = manager.computeIfAbsent(file, position, size, () -> buffer1); + assertEquals(buffer1, byteBuffer1); + // buffer should be reused + String file2 = "/CID-fd49f4a7-670d-43c5-a177-8ac03aafceb2/current/containerDir0/2/chunks/113750153625600065.block"; + ByteBuffer byteBuffer2 = manager.computeIfAbsent(file2, position, size, () -> buffer2); + assertEquals(buffer1, byteBuffer2); + } +}