Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -182,8 +181,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
private final BufferPool bufferPool;

// future Map to hold up all putBlock futures
private final ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
futureMap = new ConcurrentHashMap<>();

CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
super(xceiverClient);
Expand All @@ -67,11 +69,24 @@ void releaseBuffers(long index) {
+ totalLength + ": existing = " + futureMap.keySet());
}

ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> getFutureMap() {
@VisibleForTesting
ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}

public void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
futureMap.compute(flushPos,
(key, previous) -> previous == null ? flushFuture :
previous.thenCombine(flushFuture, (prev, curr) -> curr));
}


public void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(futureMap.values().toArray(
new CompletableFuture[0])).get();
}

@Override
public void cleanup() {
super.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,13 @@ void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.putFlushFuture(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
commitWatcher.waitOnFlushFutures();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

private final ECContainerOperationClient containerOperationClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final int numParityBlks;
private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
private final ExecutorService flushExecutor;
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

Expand Down Expand Up @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) {
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
builder.getReplicationConfig());
this.flushExecutor = Executors.newSingleThreadExecutor();
S3Auth s3Auth = builder.getS3CredentialsProvider().get();
ThreadLocal<S3Auth> s3CredentialsProvider =
builder.getS3CredentialsProvider();
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> {
s3CredentialsProvider.set(s3Auth);
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
Expand Down Expand Up @@ -495,7 +493,6 @@ public void close() throws IOException {
} catch (InterruptedException e) {
throw new IOException("Flushing thread was interrupted", e);
} finally {
flushExecutor.shutdownNow();
closeCurrentStreamEntry();
blockOutputStreamEntryPool.cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the writeThreadPool is used.
private static final int WRITE_POOL_MIN_SIZE = 0;
private static final int WRITE_POOL_MIN_SIZE = 1;

private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testReleaseBuffers() throws Exception {
return v;
});
futures.add(future);
watcher.getFutureMap().put(length, future);
watcher.putFlushFuture(length, future);
replies.add(reply);
}

Expand Down Expand Up @@ -282,7 +282,7 @@ public void testReleaseBuffersOnException() throws Exception {
return v;
});
futures.add(future);
watcher.getFutureMap().put(length, future);
watcher.putFlushFuture(length, future);
replies.add(reply);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ static void shutdown() throws IOException {
}
}

static void reInitClient() throws IOException {
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
TestOzoneRpcClient.setOzClient(ozClient);
TestOzoneRpcClient.setStore(store);
}


@ParameterizedTest
@EnumSource
void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
Expand Down Expand Up @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception {

KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
assertNotEquals(kp3, kpSpy);
// Restore ozClient and store
TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
TestOzoneRpcClient.setStore(ozClient.getObjectStore());
reInitClient();
}

private static RepeatedOmKeyInfo getMatchedKeyInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,7 @@ public void testPutKeyRatisThreeNodesParallel() throws IOException,
}
latch.countDown();
} catch (IOException ex) {
LOG.error("Execution failed: ", ex);
latch.countDown();
failCount.incrementAndGet();
}
Expand Down