Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,32 +94,11 @@ OMClientResponse getResponse() {
}
}

// Taken unbounded queue, if sync thread is taking too long time, we
// might end up taking huge memory to add entries to the buffer.
// TODO: We can avoid this using unbounded queue and use queue with
// capacity, if queue is full we can wait for sync to be completed to
// add entries. But in this also we might block rpc handlers, as we
// clear entries after sync. Or we can come up with a good approach to
// solve this.
private Queue<Entry> currentBuffer;
private Queue<Entry> readyBuffer;

private final Daemon daemon;
private final OMMetadataManager omMetadataManager;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

private final Consumer<TermIndex> updateLastAppliedIndex;
private final boolean isRatisEnabled;
private final boolean isTracingEnabled;
private final Semaphore unFlushedTransactions;
private final FlushNotifier flushNotifier;
private final S3SecretManager s3SecretManager;

/**
* Builder for creating OzoneManagerDoubleBuffer.
*/
public static class Builder {
private OMMetadataManager mm;
public static final class Builder {
private OMMetadataManager omMetadataManager;
private Consumer<TermIndex> updateLastAppliedIndex = termIndex -> { };
private boolean isRatisEnabled = false;
private boolean isTracingEnabled = false;
Expand All @@ -128,9 +107,10 @@ public static class Builder {
private S3SecretManager s3SecretManager;
private String threadPrefix = "";

private Builder() { }

public Builder setOmMetadataManager(OMMetadataManager omm) {
this.mm = omm;
public Builder setOmMetadataManager(OMMetadataManager omMetadataManager) {
this.omMetadataManager = omMetadataManager;
return this;
}

Expand All @@ -149,8 +129,8 @@ public Builder enableTracing(boolean enableTracing) {
return this;
}

public Builder setmaxUnFlushedTransactionCount(int size) {
this.maxUnFlushedTransactionCount = size;
public Builder setMaxUnFlushedTransactionCount(int maxUnFlushedTransactionCount) {
this.maxUnFlushedTransactionCount = maxUnFlushedTransactionCount;
return this;
}

Expand All @@ -170,64 +150,92 @@ public Builder setS3SecretManager(S3SecretManager s3SecretManager) {
}

public OzoneManagerDoubleBuffer build() {
if (isRatisEnabled) {
Preconditions.checkState(maxUnFlushedTransactionCount > 0L,
"when ratis is enable, maxUnFlushedTransactions " +
"should be bigger than 0");
}
Preconditions.assertTrue(isRatisEnabled == maxUnFlushedTransactionCount > 0L,
() -> "Ratis is " + (isRatisEnabled ? "enabled" : "disabled")
+ " but maxUnFlushedTransactionCount = " + maxUnFlushedTransactionCount);
if (flushNotifier == null) {
flushNotifier = new FlushNotifier();
}

return new OzoneManagerDoubleBuffer(mm, updateLastAppliedIndex, isRatisEnabled,
isTracingEnabled, maxUnFlushedTransactionCount,
flushNotifier, s3SecretManager, threadPrefix);
return new OzoneManagerDoubleBuffer(this);
}
}

public static Builder newBuilder() {
return new Builder();
}

static Semaphore newSemaphore(int permits) {
return permits > 0 ? new Semaphore(permits) : null;
}

private Queue<Entry> currentBuffer;
private Queue<Entry> readyBuffer;
/**
* Limit the number of un-flushed transactions for {@link OzoneManagerStateMachine}.
* It is set to null if ratis is disabled; see {@link #isRatisEnabled()}.
*/
private final Semaphore unFlushedTransactions;

/** To flush the buffers. */
private final Daemon daemon;
/** Is the {@link #daemon} running? */
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** Notify flush operations are completed by the {@link #daemon}. */
private final FlushNotifier flushNotifier;

private final OMMetadataManager omMetadataManager;

private final Consumer<TermIndex> updateLastAppliedIndex;

private final S3SecretManager s3SecretManager;

private final boolean isTracingEnabled;

private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create();

/** Accumulative count (for testing and debug only). */
private final AtomicLong flushedTransactionCount = new AtomicLong();
/** The number of flush iterations (for testing and debug only). */
private final AtomicLong flushIterations = new AtomicLong();

@SuppressWarnings("checkstyle:parameternumber")
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
Consumer<TermIndex> updateLastAppliedIndex,
boolean isRatisEnabled, boolean isTracingEnabled,
int maxUnFlushedTransactions,
FlushNotifier flushNotifier, S3SecretManager s3SecretManager,
String threadPrefix) {
private OzoneManagerDoubleBuffer(Builder b) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.isRatisEnabled = isRatisEnabled;
this.isTracingEnabled = isTracingEnabled;
this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
this.updateLastAppliedIndex = updateLastAppliedIndex;
this.flushNotifier = flushNotifier;

this.omMetadataManager = b.omMetadataManager;
this.s3SecretManager = b.s3SecretManager;
this.updateLastAppliedIndex = b.updateLastAppliedIndex;
this.flushNotifier = b.flushNotifier;
this.unFlushedTransactions = newSemaphore(b.maxUnFlushedTransactionCount);

this.isTracingEnabled = b.isTracingEnabled;

isRunning.set(true);
// Daemon thread which runs in background and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
daemon.setName(threadPrefix + "OMDoubleBufferFlushThread");
daemon.setName(b.threadPrefix + "OMDoubleBufferFlushThread");
daemon.start();
this.s3SecretManager = s3SecretManager;
}

private boolean isRatisEnabled() {
return unFlushedTransactions != null;
}

/**
* Acquires the given number of permits from unFlushedTransactions,
* blocking until all are available, or the thread is interrupted.
*/
public void acquireUnFlushedTransactions(int n) throws InterruptedException {
Preconditions.assertTrue(isRatisEnabled(), "Ratis is not enabled");
unFlushedTransactions.acquire(n);
}

/**
* Releases the given number of permits,
* returning them to the unFlushedTransactions.
*/
public void releaseUnFlushedTransactions(int n) {
void releaseUnFlushedTransactions(int n) {
unFlushedTransactions.release(n);
}

Expand Down Expand Up @@ -360,7 +368,7 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {

// Complete futures first and then do other things.
// So that handler threads will be released.
if (!isRatisEnabled) {
if (!isRatisEnabled()) {
buffer.stream()
.map(Entry::getResponse)
.map(OMClientResponse::getFlushFuture)
Expand All @@ -375,7 +383,7 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
// Clean up committed transactions.
cleanupCache(cleanupEpochs);

if (isRatisEnabled) {
if (isRatisEnabled()) {
releaseUnFlushedTransactions(flushedTransactionsSize);
}
// update the last updated index in OzoneManagerStateMachine.
Expand Down Expand Up @@ -537,7 +545,7 @@ public synchronized void add(OMClientResponse response, TermIndex termIndex) {
currentBuffer.add(new Entry(termIndex, response));
notify();

if (!isRatisEnabled) {
if (!isRatisEnabled()) {
response.setFlushFuture(new CompletableFuture<>());
}
}
Expand Down Expand Up @@ -639,7 +647,7 @@ private CompletableFuture<Integer> await() {
}

private int complete() {
Preconditions.checkState(future.complete(count));
Preconditions.assertTrue(future.complete(count));
return future.join();
}
}
Expand All @@ -654,7 +662,7 @@ synchronized CompletableFuture<Integer> await() {
final int flush = flushCount + 2;
LOG.debug("await flush {}", flush);
final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry());
Preconditions.checkState(flushFutures.size() <= 2);
Preconditions.assertTrue(flushFutures.size() <= 2);
return entry.await();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,11 @@ public CompletableFuture<Message> query(Message request) {
public synchronized void pause() {
LOG.info("OzoneManagerStateMachine is pausing");
statePausedCount.incrementAndGet();
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
final LifeCycle.State state = getLifeCycleState();
if (state == LifeCycle.State.PAUSED) {
return;
}
final LifeCycle lc = getLifeCycle();
if (lc.getCurrentState() != LifeCycle.State.NEW) {
if (state != LifeCycle.State.NEW) {
getLifeCycle().transition(LifeCycle.State.PAUSING);
getLifeCycle().transition(LifeCycle.State.PAUSED);
}
Expand All @@ -423,13 +423,13 @@ public synchronized void unpause(long newLastAppliedSnaphsotIndex,
}

public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
int maxUnflushedTransactionSize = ozoneManager.getConfiguration()
final int maxUnFlushedTransactionCount = ozoneManager.getConfiguration()
.getInt(OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT,
OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT);
return new OzoneManagerDoubleBuffer.Builder()
return OzoneManagerDoubleBuffer.newBuilder()
.setOmMetadataManager(ozoneManager.getMetadataManager())
.setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex)
.setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize)
.setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount)
.setThreadPrefix(threadPrefix)
.setS3SecretManager(ozoneManager.getS3SecretManager())
.enableRatis(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,20 @@
import org.slf4j.LoggerFactory;

/**
* This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB}
* to the OzoneManagerService server implementation.
* This is the server-side translator that forwards requests received
* from {@link OzoneManagerProtocolPB} to {@link OzoneManager}.
*/
public class OzoneManagerProtocolServerSideTranslatorPB implements
OzoneManagerProtocolPB {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private static final String OM_REQUESTS_PACKAGE =
"org.apache.hadoop.ozone";
public class OzoneManagerProtocolServerSideTranslatorPB implements OzoneManagerProtocolPB {
private static final Logger LOG = LoggerFactory .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private static final String OM_REQUESTS_PACKAGE = "org.apache.hadoop.ozone";

private final OzoneManagerRatisServer omRatisServer;
private final RequestHandler handler;
private final boolean isRatisEnabled;
private final OzoneManager ozoneManager;
/**
* Only used to handle write requests when ratis is disabled.
* When ratis is enabled, write requests are handled by the state machine.
*/
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final AtomicLong transactionIndex;
private final OzoneProtocolMessageDispatcher<OMRequest, OMResponse,
Expand All @@ -100,39 +99,31 @@ public OzoneManagerProtocolServerSideTranslatorPB(
long lastTransactionIndexForNonRatis) {
this.ozoneManager = impl;
this.perfMetrics = impl.getPerfMetrics();
this.isRatisEnabled = enableRatis;
// Update the transactionIndex with the last TransactionIndex read from DB.
// New requests should have transactionIndex incremented from this index
// onwards to ensure unique objectIDs.
this.transactionIndex = new AtomicLong(lastTransactionIndexForNonRatis);

if (isRatisEnabled) {
// In case of ratis is enabled, handler in ServerSideTransaltorPB is used
// only for read requests and read requests does not require
// double-buffer to be initialized.
this.ozoneManagerDoubleBuffer = null;
handler = new OzoneManagerRequestHandler(impl, null);
} else {
this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
// When ratis is enabled, the handler does not require a double-buffer since it only handle read requests.
this.ozoneManagerDoubleBuffer = enableRatis ? null
: OzoneManagerDoubleBuffer.newBuilder()
.setOmMetadataManager(ozoneManager.getMetadataManager())
.enableRatis(isRatisEnabled)
.enableTracing(TracingUtil.isTracingEnabled(
ozoneManager.getConfiguration()))
.enableTracing(TracingUtil.isTracingEnabled(ozoneManager.getConfiguration()))
.build();
handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
}
this.handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
this.omRatisServer = ratisServer;
dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
metrics, LOG, OMPBHelper::processForDebug, OMPBHelper::processForDebug);

// TODO: make this injectable for testing...
requestValidations =
new RequestValidations()
.fromPackage(OM_REQUESTS_PACKAGE)
.withinContext(
ValidationContext.of(ozoneManager.getVersionManager(),
ozoneManager.getMetadataManager()))
.load();
this.requestValidations = new RequestValidations()
.fromPackage(OM_REQUESTS_PACKAGE)
.withinContext(ValidationContext.of(ozoneManager.getVersionManager(), ozoneManager.getMetadataManager()))
.load();
}

private boolean isRatisEnabled() {
return ozoneManagerDoubleBuffer == null;
}

/**
Expand Down Expand Up @@ -197,7 +188,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws
}
}

if (!isRatisEnabled) {
if (!isRatisEnabled()) {
return submitRequestDirectlyToOM(request);
}

Expand Down Expand Up @@ -320,13 +311,7 @@ private OMResponse submitRequestDirectlyToOM(OMRequest request) {
return omClientResponse.getOMResponse();
}

/**
* Create OMResponse from the specified OMRequest and exception.
*
* @param omRequest
* @param exception
* @return OMResponse
*/
/** @return an {@link OMResponse} from the given {@link OMRequest} and the given exception. */
private OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
// Added all write command types here, because in future if any of the
Expand All @@ -344,7 +329,7 @@ private OMResponse createErrorResponse(
}

public void stop() {
if (!isRatisEnabled) {
if (ozoneManagerDoubleBuffer != null) {
ozoneManagerDoubleBuffer.stop();
}
}
Expand Down
Loading