diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java new file mode 100644 index 000000000000..5411d1ce233e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/Buffers.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.ReferenceCountedObject; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Keep the last {@link org.apache.hadoop.ozone.container.keyvalue.impl.Buffers#max} bytes in the buffer + * in order to create putBlockRequest + * at {@link #closeBuffers(org.apache.hadoop.ozone.container.keyvalue.impl.Buffers, WriteMethod)}}. + */ +class Buffers { + private final Deque> deque = new LinkedList<>(); + private final int max; + private int length; + + Buffers(int max) { + this.max = max; + } + + private boolean isExtra(int n) { + return length - n >= max; + } + + private boolean hasExtraBuffer() { + return Optional + .ofNullable(deque.peek()) + .map(ReferenceCountedObject::get) + .filter(b -> isExtra(b.remaining())).isPresent(); + } + + /** + * @return extra buffers which are safe to be written. + */ + Iterable> offer(ReferenceCountedObject ref) { + final ByteBuffer buffer = ref.retain(); + KeyValueStreamDataChannel.LOG.debug("offer {}", buffer); + final boolean offered = deque.offer(ref); + Preconditions.checkState(offered, "Failed to offer"); + length += buffer.remaining(); + + return () -> new Iterator>() { + @Override + public boolean hasNext() { + return hasExtraBuffer(); + } + + @Override + public ReferenceCountedObject next() { + final ReferenceCountedObject polled = poll(); + length -= polled.get().remaining(); + Preconditions.checkState(length >= max); + return polled; + } + }; + } + + ReferenceCountedObject poll() { + final ReferenceCountedObject polled = Objects.requireNonNull(deque.poll()); + RatisHelper.debug(polled.get(), "polled", KeyValueStreamDataChannel.LOG); + return polled; + } + + ReferenceCountedObject pollAll() { + Preconditions.checkState(!deque.isEmpty(), "The deque is empty"); + final ByteBuffer[] array = new ByteBuffer[deque.size()]; + final List> refs = new ArrayList<>(deque.size()); + for (int i = 0; i < array.length; i++) { + final ReferenceCountedObject ref = poll(); + refs.add(ref); + array[i] = ref.get(); + } + final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly(); + return ReferenceCountedObject.wrap(buf, () -> { }, () -> { + buf.release(); + refs.forEach(ReferenceCountedObject::release); + }); + } + + void cleanUpAll() { + while (!deque.isEmpty()) { + poll().release(); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 7a08c7ef4e84..7500860229d9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -29,7 +29,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; -import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; import org.apache.ratis.util.ReferenceCountedObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,13 +36,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -54,91 +47,6 @@ public class KeyValueStreamDataChannel extends StreamDataChannelBase { public static final Logger LOG = LoggerFactory.getLogger(KeyValueStreamDataChannel.class); - /** - * Keep the last {@link Buffers#max} bytes in the buffer - * in order to create putBlockRequest - * at {@link #closeBuffers(Buffers, WriteMethod)}}. - */ - static class Buffers { - private final Deque> deque - = new LinkedList<>(); - private final int max; - private int length; - - Buffers(int max) { - this.max = max; - } - - private boolean isExtra(int n) { - return length - n >= max; - } - - private boolean hasExtraBuffer() { - return Optional.ofNullable(deque.peek()) - .map(ReferenceCountedObject::get) - .filter(b -> isExtra(b.remaining())) - .isPresent(); - } - - /** - * @return extra buffers which are safe to be written. - */ - Iterable> offer( - ReferenceCountedObject ref) { - final ByteBuffer buffer = ref.retain(); - LOG.debug("offer {}", buffer); - final boolean offered = deque.offer(ref); - Preconditions.checkState(offered, "Failed to offer"); - length += buffer.remaining(); - - return () -> new Iterator>() { - @Override - public boolean hasNext() { - return hasExtraBuffer(); - } - - @Override - public ReferenceCountedObject next() { - final ReferenceCountedObject polled = poll(); - length -= polled.get().remaining(); - Preconditions.checkState(length >= max); - return polled; - } - }; - } - - ReferenceCountedObject poll() { - final ReferenceCountedObject polled - = Objects.requireNonNull(deque.poll()); - RatisHelper.debug(polled.get(), "polled", LOG); - return polled; - } - - ReferenceCountedObject pollAll() { - Preconditions.checkState(!deque.isEmpty(), "The deque is empty"); - final ByteBuffer[] array = new ByteBuffer[deque.size()]; - final List> refs - = new ArrayList<>(deque.size()); - for (int i = 0; i < array.length; i++) { - final ReferenceCountedObject ref = poll(); - refs.add(ref); - array[i] = ref.get(); - } - final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly(); - return ReferenceCountedObject.wrap(buf, () -> { - }, () -> { - buf.release(); - refs.forEach(ReferenceCountedObject::release); - }); - } - - void cleanUpAll() { - while (!deque.isEmpty()) { - poll().release(); - } - } - } - interface WriteMethod { int applyAsInt(ByteBuffer src) throws IOException; } @@ -184,7 +92,7 @@ static int writeBuffers(ReferenceCountedObject src, private static void writeFully(ByteBuffer b, WriteMethod writeMethod) throws IOException { - for (; b.remaining() > 0;) { + while (b.remaining() > 0) { final int written = writeMethod.applyAsInt(b); if (written <= 0) { throw new IOException("Unable to write"); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java index 63045f76136b..e6067e5c5609 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.ozone.ClientVersion; -import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.FilePositionCount;