Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.client.io;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.io.ByteBufferPool;

/**
* A bounded version of ElasticByteBufferPool that limits the total size
* of buffers that can be cached in the pool. This prevents unbounded memory
* growth in long-lived rpc clients like S3 Gateway.
*
* When the pool reaches its maximum size, newly returned buffers are not
* added back to the pool and will be garbage collected instead.
*/
public class BoundedElasticByteBufferPool implements ByteBufferPool {
private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<>();
private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<>();
private final long maxPoolSize;
private final AtomicLong currentPoolSize = new AtomicLong(0);

/**
* A logical timestamp counter used for creating unique Keys in the TreeMap.
* This is used as the insertionTime for the Key instead of System.nanoTime()
* to guarantee uniqueness and avoid a potential spin-wait in putBuffer
* if two buffers of the same capacity are added at the same nanosecond.
*/
private long logicalTimestamp = 0;

public BoundedElasticByteBufferPool(long maxPoolSize) {
super();
this.maxPoolSize = maxPoolSize;
}

private TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
return direct ? this.directBuffers : this.buffers;
}

@Override
public synchronized ByteBuffer getBuffer(boolean direct, int length) {
TreeMap<Key, ByteBuffer> tree = this.getBufferTree(direct);
Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0L));
if (entry == null) {
// Pool is empty or has no suitable buffer. Allocate a new one.
return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
}
tree.remove(entry.getKey());
ByteBuffer buffer = entry.getValue();

// Decrement the size because we are taking a buffer OUT of the pool.
currentPoolSize.addAndGet(-buffer.capacity());
buffer.clear();
return buffer;
}
Comment on lines +60 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also count those "allocated but not released" buffer into the buffer size limit?

Just like BufferPool does:

private final LinkedList<ChunkBuffer> allocated = new LinkedList<>();
private final LinkedList<ChunkBuffer> released = new LinkedList<>();

while (allocated.size() == capacity) {
LOG.debug("Allocation needs to wait the pool is at capacity (allocated = capacity = {}).", capacity);
notFull.await();
}

I know that the original ElasticByteBufferPool doesn't do this.
Just want to make sure if we need to managed the allocated buffer, and why or why not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your suggestion but I think If we did count "allocated but not released" buffers toward the limit, we would be forced to change our getBuffer method to be blocking (i.e., to wait() when the limit is hit).
This would be a major, high-risk change from the original ElasticByteBufferPool's behavior, which always allocates a new buffer immediately. It could introduce performance bottlenecks or even deadlocks.

Copy link
Contributor Author

@Gargi-jais11 Gargi-jais11 Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BufferPool is linked to is a blocking, fixed-size pool. Its purpose is to strictly limit the total number of buffers ever created (e.g., "this system will only ever use 100 buffers, total"). If you ask for buffer 101, getBuffer will wait until one is returned.

Our BoundedElasticByteBufferPool is a non-blocking, caching pool. Its purpose is to fix a memory leak from the original ElasticByteBufferPool (which grew forever) while preserving its "elastic" (non-blocking) nature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good—let’s get this merged.


@Override
public synchronized void putBuffer(ByteBuffer buffer) {
if (buffer == null) {
return;
}

if (currentPoolSize.get() + buffer.capacity() > maxPoolSize) {
// Pool is full, do not add the buffer back.
// It will be garbage collected by JVM.
return;
}

buffer.clear();
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
Key key = new Key(buffer.capacity(), logicalTimestamp++);

tree.put(key, buffer);
// Increment the size because we have successfully added buffer back to the pool.
currentPoolSize.addAndGet(buffer.capacity());
}

/**
* Get the current size of buffers in the pool.
*
* @return Current pool size in bytes
*/
@VisibleForTesting
public synchronized long getCurrentPoolSize() {
return currentPoolSize.get();
}

/**
* The Key for the buffer TreeMaps.
* This is copied directly from the original ElasticByteBufferPool.
*/
protected static final class Key implements Comparable<Key> {
private final int capacity;
private final long insertionTime;

Key(int capacity, long insertionTime) {
this.capacity = capacity;
this.insertionTime = insertionTime;
}

@Override
public int compareTo(Key other) {
return ComparisonChain.start()
.compare(this.capacity, other.capacity)
.compare(this.insertionTime, other.insertionTime)
.result();
}

@Override
public boolean equals(Object rhs) {
if (rhs == null) {
return false;
}
try {
Key o = (Key) rhs;
return compareTo(o) == 0;
} catch (ClassCastException e) {
return false;
}
}

@Override
public int hashCode() {
return new HashCodeBuilder().append(capacity).append(insertionTime)
.toHashCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.client.io;

import java.nio.ByteBuffer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Unit tests for BoundedElasticByteBufferPool.
*/
public class TestBoundedElasticByteBufferPool {

private static final int MB = 1024 * 1024;
private static final long MAX_POOL_SIZE = 3L * MB; // 3MB

@Test
public void testLogicalTimestampOrdering() {
// Pool with plenty of capacity
BoundedElasticByteBufferPool pool = new BoundedElasticByteBufferPool(MAX_POOL_SIZE);
int bufferSize = 5 * 1024; // 5KB

// Create and add three distinct buffers of the same size
ByteBuffer buffer1 = ByteBuffer.allocate(bufferSize);
ByteBuffer buffer2 = ByteBuffer.allocate(bufferSize);
ByteBuffer buffer3 = ByteBuffer.allocate(bufferSize);

// Store their unique identity hash codes
int hash1 = System.identityHashCode(buffer1);
int hash2 = System.identityHashCode(buffer2);
int hash3 = System.identityHashCode(buffer3);

pool.putBuffer(buffer1);
pool.putBuffer(buffer2);
pool.putBuffer(buffer3);

// The pool should now contain 15KB data
Assertions.assertEquals(bufferSize * 3L, pool.getCurrentPoolSize());

// Get the buffers back. They should come back in the same
// order they were put in (FIFO).
ByteBuffer retrieved1 = pool.getBuffer(false, bufferSize);
ByteBuffer retrieved2 = pool.getBuffer(false, bufferSize);
ByteBuffer retrieved3 = pool.getBuffer(false, bufferSize);

// Verify we got the exact same buffer instances back in FIFO order
Assertions.assertEquals(hash1, System.identityHashCode(retrieved1));
Assertions.assertEquals(hash2, System.identityHashCode(retrieved2));
Assertions.assertEquals(hash3, System.identityHashCode(retrieved3));

// The pool should now be empty
Assertions.assertEquals(0, pool.getCurrentPoolSize());
}

/**
* Verifies the core feature: the pool stops caching buffers
* once its maximum size is reached.
*/
@Test
public void testPoolBoundingLogic() {
BoundedElasticByteBufferPool pool = new BoundedElasticByteBufferPool(MAX_POOL_SIZE);

ByteBuffer buffer1 = ByteBuffer.allocate(2 * MB);
ByteBuffer buffer2 = ByteBuffer.allocate(1 * MB);
ByteBuffer buffer3 = ByteBuffer.allocate(3 * MB);

int hash1 = System.identityHashCode(buffer1);
int hash2 = System.identityHashCode(buffer2);
int hash3 = System.identityHashCode(buffer3);

// 1. Put buffer 1 (Pool size: 2MB, remaining: 1MB)
pool.putBuffer(buffer1);
Assertions.assertEquals(2 * MB, pool.getCurrentPoolSize());

// 2. Put buffer 2 (Pool size: 2MB + 1MB = 3MB, remaining: 0)
// The check is (current(2MB) + new(1MB)) > max(3MB), which is false.
// So, the buffer IS added.
pool.putBuffer(buffer2);
Assertions.assertEquals(3 * MB, pool.getCurrentPoolSize());

// 3. Put buffer 3 (Capacity 3MB)
// The check is (current(3MB) + new(3MB)) > max(3MB), which is true.
// This buffer should be REJECTED.
pool.putBuffer(buffer3);
// The pool size should NOT change.
Assertions.assertEquals(3 * MB, pool.getCurrentPoolSize());

// 4. Get buffers back
ByteBuffer retrieved1 = pool.getBuffer(false, 2 * MB);
ByteBuffer retrieved2 = pool.getBuffer(false, 1 * MB);

// The pool should now be empty
Assertions.assertEquals(0, pool.getCurrentPoolSize());

// 5. Ask for a third buffer.
// Since buffer3 was rejected, this should be a NEWLY allocated buffer.
ByteBuffer retrieved3 = pool.getBuffer(false, 3 * MB);

// Verify that we got the first two buffers from the pool
Assertions.assertEquals(hash1, System.identityHashCode(retrieved1));
Assertions.assertEquals(hash2, System.identityHashCode(retrieved2));

// Verify that the third buffer is a NEW instance, not buffer3
Assertions.assertNotEquals(hash3, System.identityHashCode(retrieved3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ public final class OzoneConfigKeys {
"ozone.security.crypto.compliance.mode";
public static final String OZONE_SECURITY_CRYPTO_COMPLIANCE_MODE_UNRESTRICTED = "unrestricted";

public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE =
"ozone.client.elastic.byte.buffer.pool.max.size";
public static final String OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB";

/**
* There is no need to instantiate this class.
*/
Expand Down
12 changes: 12 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,18 @@
<description>Socket timeout for Ozone client. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
<name>ozone.client.elastic.byte.buffer.pool.max.size</name>
<value>16GB</value>
<tag>OZONE, CLIENT</tag>
<description>
The maximum total size of buffers that can be cached in the client-side
ByteBufferPool. This pool is used heavily during EC read and write operations.
Setting a limit prevents unbounded memory growth in long-lived rpc clients
like the S3 Gateway. Once this limit is reached, used buffers are not
put back to the pool and will be garbage collected.
Comment on lines +476 to +477
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used buffers are not put back to the pool and will be garbage collected.

can we help them to deallocate the buffer immediately? so we can reduce the GC pressure.

not quite understand how GC in java works

Copy link
Contributor Author

@Gargi-jais11 Gargi-jais11 Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java, we can't deallocate memory manually (like free() in C/C++). The only way to free memory is to remove all references to an object and let the Garbage Collector (GC) reclaim it.
When our pool is full, by returning without storing the buffer, we are doing exactly that. The buffer becomes "unreachable," and the GC will handle its deallocation.

So, I believe while we are still relying on the GC (which is unavoidable in Java), it's for a much smaller fraction of objects, which is exactly the fix we want to reduce overall s3g memory pressure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also call System.gc() to suggest that the garbage collector run immediately. However, the Java Runtime makes the final decision.
According to the Java documentation.
So immediately deallocating buffer is not allowed by java. However if needed we can use System.gc() to run garbage collector immediately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, Thanks for the detail explanation. learned a lot!

</description>
</property>
<property>
<name>ozone.key.deleting.limit.per.task</name>
<value>50000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.client.rpc;

import static org.apache.hadoop.ozone.OzoneAcl.LINK_BUCKET_DEFAULT_ACL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -88,7 +91,6 @@
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
Expand All @@ -110,6 +112,7 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.BoundedElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.CipherOutputStreamOzone;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
Expand Down Expand Up @@ -318,7 +321,11 @@ public void onRemoval(
}
}
}).build();
this.byteBufferPool = new ElasticByteBufferPool();
long maxPoolSize = (long) conf.getStorageSize(
OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE,
OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT,
StorageUnit.GB);
this.byteBufferPool = new BoundedElasticByteBufferPool(maxPoolSize);
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, ecReconstructExecutor);
this.clientMetrics = ContainerClientMetrics.acquire();
Expand Down