diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BoundedElasticByteBufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BoundedElasticByteBufferPool.java new file mode 100644 index 000000000000..17311ddb5da9 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BoundedElasticByteBufferPool.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.io.ByteBufferPool; + +/** + * A bounded version of ElasticByteBufferPool that limits the total size + * of buffers that can be cached in the pool. This prevents unbounded memory + * growth in long-lived rpc clients like S3 Gateway. + * + * When the pool reaches its maximum size, newly returned buffers are not + * added back to the pool and will be garbage collected instead. + */ +public class BoundedElasticByteBufferPool implements ByteBufferPool { + private final TreeMap buffers = new TreeMap<>(); + private final TreeMap directBuffers = new TreeMap<>(); + private final long maxPoolSize; + private final AtomicLong currentPoolSize = new AtomicLong(0); + + /** + * A logical timestamp counter used for creating unique Keys in the TreeMap. + * This is used as the insertionTime for the Key instead of System.nanoTime() + * to guarantee uniqueness and avoid a potential spin-wait in putBuffer + * if two buffers of the same capacity are added at the same nanosecond. + */ + private long logicalTimestamp = 0; + + public BoundedElasticByteBufferPool(long maxPoolSize) { + super(); + this.maxPoolSize = maxPoolSize; + } + + private TreeMap getBufferTree(boolean direct) { + return direct ? this.directBuffers : this.buffers; + } + + @Override + public synchronized ByteBuffer getBuffer(boolean direct, int length) { + TreeMap tree = this.getBufferTree(direct); + Map.Entry entry = tree.ceilingEntry(new Key(length, 0L)); + if (entry == null) { + // Pool is empty or has no suitable buffer. Allocate a new one. + return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length); + } + tree.remove(entry.getKey()); + ByteBuffer buffer = entry.getValue(); + + // Decrement the size because we are taking a buffer OUT of the pool. + currentPoolSize.addAndGet(-buffer.capacity()); + buffer.clear(); + return buffer; + } + + @Override + public synchronized void putBuffer(ByteBuffer buffer) { + if (buffer == null) { + return; + } + + if (currentPoolSize.get() + buffer.capacity() > maxPoolSize) { + // Pool is full, do not add the buffer back. + // It will be garbage collected by JVM. + return; + } + + buffer.clear(); + TreeMap tree = getBufferTree(buffer.isDirect()); + Key key = new Key(buffer.capacity(), logicalTimestamp++); + + tree.put(key, buffer); + // Increment the size because we have successfully added buffer back to the pool. + currentPoolSize.addAndGet(buffer.capacity()); + } + + /** + * Get the current size of buffers in the pool. + * + * @return Current pool size in bytes + */ + @VisibleForTesting + public synchronized long getCurrentPoolSize() { + return currentPoolSize.get(); + } + + /** + * The Key for the buffer TreeMaps. + * This is copied directly from the original ElasticByteBufferPool. + */ + protected static final class Key implements Comparable { + private final int capacity; + private final long insertionTime; + + Key(int capacity, long insertionTime) { + this.capacity = capacity; + this.insertionTime = insertionTime; + } + + @Override + public int compareTo(Key other) { + return ComparisonChain.start() + .compare(this.capacity, other.capacity) + .compare(this.insertionTime, other.insertionTime) + .result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key) rhs; + return compareTo(o) == 0; + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(capacity).append(insertionTime) + .toHashCode(); + } + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBoundedElasticByteBufferPool.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBoundedElasticByteBufferPool.java new file mode 100644 index 000000000000..f32b81bfe8cb --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBoundedElasticByteBufferPool.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import java.nio.ByteBuffer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for BoundedElasticByteBufferPool. + */ +public class TestBoundedElasticByteBufferPool { + + private static final int MB = 1024 * 1024; + private static final long MAX_POOL_SIZE = 3L * MB; // 3MB + + @Test + public void testLogicalTimestampOrdering() { + // Pool with plenty of capacity + BoundedElasticByteBufferPool pool = new BoundedElasticByteBufferPool(MAX_POOL_SIZE); + int bufferSize = 5 * 1024; // 5KB + + // Create and add three distinct buffers of the same size + ByteBuffer buffer1 = ByteBuffer.allocate(bufferSize); + ByteBuffer buffer2 = ByteBuffer.allocate(bufferSize); + ByteBuffer buffer3 = ByteBuffer.allocate(bufferSize); + + // Store their unique identity hash codes + int hash1 = System.identityHashCode(buffer1); + int hash2 = System.identityHashCode(buffer2); + int hash3 = System.identityHashCode(buffer3); + + pool.putBuffer(buffer1); + pool.putBuffer(buffer2); + pool.putBuffer(buffer3); + + // The pool should now contain 15KB data + Assertions.assertEquals(bufferSize * 3L, pool.getCurrentPoolSize()); + + // Get the buffers back. They should come back in the same + // order they were put in (FIFO). + ByteBuffer retrieved1 = pool.getBuffer(false, bufferSize); + ByteBuffer retrieved2 = pool.getBuffer(false, bufferSize); + ByteBuffer retrieved3 = pool.getBuffer(false, bufferSize); + + // Verify we got the exact same buffer instances back in FIFO order + Assertions.assertEquals(hash1, System.identityHashCode(retrieved1)); + Assertions.assertEquals(hash2, System.identityHashCode(retrieved2)); + Assertions.assertEquals(hash3, System.identityHashCode(retrieved3)); + + // The pool should now be empty + Assertions.assertEquals(0, pool.getCurrentPoolSize()); + } + + /** + * Verifies the core feature: the pool stops caching buffers + * once its maximum size is reached. + */ + @Test + public void testPoolBoundingLogic() { + BoundedElasticByteBufferPool pool = new BoundedElasticByteBufferPool(MAX_POOL_SIZE); + + ByteBuffer buffer1 = ByteBuffer.allocate(2 * MB); + ByteBuffer buffer2 = ByteBuffer.allocate(1 * MB); + ByteBuffer buffer3 = ByteBuffer.allocate(3 * MB); + + int hash1 = System.identityHashCode(buffer1); + int hash2 = System.identityHashCode(buffer2); + int hash3 = System.identityHashCode(buffer3); + + // 1. Put buffer 1 (Pool size: 2MB, remaining: 1MB) + pool.putBuffer(buffer1); + Assertions.assertEquals(2 * MB, pool.getCurrentPoolSize()); + + // 2. Put buffer 2 (Pool size: 2MB + 1MB = 3MB, remaining: 0) + // The check is (current(2MB) + new(1MB)) > max(3MB), which is false. + // So, the buffer IS added. + pool.putBuffer(buffer2); + Assertions.assertEquals(3 * MB, pool.getCurrentPoolSize()); + + // 3. Put buffer 3 (Capacity 3MB) + // The check is (current(3MB) + new(3MB)) > max(3MB), which is true. + // This buffer should be REJECTED. + pool.putBuffer(buffer3); + // The pool size should NOT change. + Assertions.assertEquals(3 * MB, pool.getCurrentPoolSize()); + + // 4. Get buffers back + ByteBuffer retrieved1 = pool.getBuffer(false, 2 * MB); + ByteBuffer retrieved2 = pool.getBuffer(false, 1 * MB); + + // The pool should now be empty + Assertions.assertEquals(0, pool.getCurrentPoolSize()); + + // 5. Ask for a third buffer. + // Since buffer3 was rejected, this should be a NEWLY allocated buffer. + ByteBuffer retrieved3 = pool.getBuffer(false, 3 * MB); + + // Verify that we got the first two buffers from the pool + Assertions.assertEquals(hash1, System.identityHashCode(retrieved1)); + Assertions.assertEquals(hash2, System.identityHashCode(retrieved2)); + + // Verify that the third buffer is a NEW instance, not buffer3 + Assertions.assertNotEquals(hash3, System.identityHashCode(retrieved3)); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index db66fed22fe9..ceca7d0c8824 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -690,6 +690,10 @@ public final class OzoneConfigKeys { "ozone.security.crypto.compliance.mode"; public static final String OZONE_SECURITY_CRYPTO_COMPLIANCE_MODE_UNRESTRICTED = "unrestricted"; + public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE = + "ozone.client.elastic.byte.buffer.pool.max.size"; + public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB"; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a6c8d61fff9e..0bfa98f991b9 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -465,6 +465,18 @@ Socket timeout for Ozone client. Unit could be defined with postfix (ns,ms,s,m,h,d) + + ozone.client.elastic.byte.buffer.pool.max.size + 16GB + OZONE, CLIENT + + The maximum total size of buffers that can be cached in the client-side + ByteBufferPool. This pool is used heavily during EC read and write operations. + Setting a limit prevents unbounded memory growth in long-lived rpc clients + like the S3 Gateway. Once this limit is reached, used buffers are not + put back to the pool and will be garbage collected. + + ozone.key.deleting.limit.per.task 50000 diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index ee4070c9eba4..d4ebf0be1b38 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.client.rpc; import static org.apache.hadoop.ozone.OzoneAcl.LINK_BUCKET_DEFAULT_ACL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY; @@ -71,6 +73,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -88,7 +91,6 @@ import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -110,6 +112,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; +import org.apache.hadoop.ozone.client.io.BoundedElasticByteBufferPool; import org.apache.hadoop.ozone.client.io.CipherOutputStreamOzone; import org.apache.hadoop.ozone.client.io.ECBlockInputStream; import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; @@ -318,7 +321,11 @@ public void onRemoval( } } }).build(); - this.byteBufferPool = new ElasticByteBufferPool(); + long maxPoolSize = (long) conf.getStorageSize( + OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE, + OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT, + StorageUnit.GB); + this.byteBufferPool = new BoundedElasticByteBufferPool(maxPoolSize); this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire();