diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index fd7b18bb6810..2c1276c43e73 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.om.ratis; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -53,6 +52,7 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,32 +94,11 @@ OMClientResponse getResponse() { } } - // Taken unbounded queue, if sync thread is taking too long time, we - // might end up taking huge memory to add entries to the buffer. - // TODO: We can avoid this using unbounded queue and use queue with - // capacity, if queue is full we can wait for sync to be completed to - // add entries. But in this also we might block rpc handlers, as we - // clear entries after sync. Or we can come up with a good approach to - // solve this. - private Queue currentBuffer; - private Queue readyBuffer; - - private final Daemon daemon; - private final OMMetadataManager omMetadataManager; - private final AtomicBoolean isRunning = new AtomicBoolean(false); - - private final Consumer updateLastAppliedIndex; - private final boolean isRatisEnabled; - private final boolean isTracingEnabled; - private final Semaphore unFlushedTransactions; - private final FlushNotifier flushNotifier; - private final S3SecretManager s3SecretManager; - /** * Builder for creating OzoneManagerDoubleBuffer. */ - public static class Builder { - private OMMetadataManager mm; + public static final class Builder { + private OMMetadataManager omMetadataManager; private Consumer updateLastAppliedIndex = termIndex -> { }; private boolean isRatisEnabled = false; private boolean isTracingEnabled = false; @@ -128,9 +107,10 @@ public static class Builder { private S3SecretManager s3SecretManager; private String threadPrefix = ""; + private Builder() { } - public Builder setOmMetadataManager(OMMetadataManager omm) { - this.mm = omm; + public Builder setOmMetadataManager(OMMetadataManager omMetadataManager) { + this.omMetadataManager = omMetadataManager; return this; } @@ -149,8 +129,8 @@ public Builder enableTracing(boolean enableTracing) { return this; } - public Builder setmaxUnFlushedTransactionCount(int size) { - this.maxUnFlushedTransactionCount = size; + public Builder setMaxUnFlushedTransactionCount(int maxUnFlushedTransactionCount) { + this.maxUnFlushedTransactionCount = maxUnFlushedTransactionCount; return this; } @@ -170,21 +150,48 @@ public Builder setS3SecretManager(S3SecretManager s3SecretManager) { } public OzoneManagerDoubleBuffer build() { - if (isRatisEnabled) { - Preconditions.checkState(maxUnFlushedTransactionCount > 0L, - "when ratis is enable, maxUnFlushedTransactions " + - "should be bigger than 0"); - } + Preconditions.assertTrue(isRatisEnabled == maxUnFlushedTransactionCount > 0L, + () -> "Ratis is " + (isRatisEnabled ? "enabled" : "disabled") + + " but maxUnFlushedTransactionCount = " + maxUnFlushedTransactionCount); if (flushNotifier == null) { flushNotifier = new FlushNotifier(); } - return new OzoneManagerDoubleBuffer(mm, updateLastAppliedIndex, isRatisEnabled, - isTracingEnabled, maxUnFlushedTransactionCount, - flushNotifier, s3SecretManager, threadPrefix); + return new OzoneManagerDoubleBuffer(this); } } + public static Builder newBuilder() { + return new Builder(); + } + + static Semaphore newSemaphore(int permits) { + return permits > 0 ? new Semaphore(permits) : null; + } + + private Queue currentBuffer; + private Queue readyBuffer; + /** + * Limit the number of un-flushed transactions for {@link OzoneManagerStateMachine}. + * It is set to null if ratis is disabled; see {@link #isRatisEnabled()}. + */ + private final Semaphore unFlushedTransactions; + + /** To flush the buffers. */ + private final Daemon daemon; + /** Is the {@link #daemon} running? */ + private final AtomicBoolean isRunning = new AtomicBoolean(false); + /** Notify flush operations are completed by the {@link #daemon}. */ + private final FlushNotifier flushNotifier; + + private final OMMetadataManager omMetadataManager; + + private final Consumer updateLastAppliedIndex; + + private final S3SecretManager s3SecretManager; + + private final boolean isTracingEnabled; + private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create(); /** Accumulative count (for testing and debug only). */ @@ -192,27 +199,27 @@ public OzoneManagerDoubleBuffer build() { /** The number of flush iterations (for testing and debug only). */ private final AtomicLong flushIterations = new AtomicLong(); - @SuppressWarnings("checkstyle:parameternumber") - private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager, - Consumer updateLastAppliedIndex, - boolean isRatisEnabled, boolean isTracingEnabled, - int maxUnFlushedTransactions, - FlushNotifier flushNotifier, S3SecretManager s3SecretManager, - String threadPrefix) { + private OzoneManagerDoubleBuffer(Builder b) { this.currentBuffer = new ConcurrentLinkedQueue<>(); this.readyBuffer = new ConcurrentLinkedQueue<>(); - this.isRatisEnabled = isRatisEnabled; - this.isTracingEnabled = isTracingEnabled; - this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions); - this.omMetadataManager = omMetadataManager; - this.updateLastAppliedIndex = updateLastAppliedIndex; - this.flushNotifier = flushNotifier; + + this.omMetadataManager = b.omMetadataManager; + this.s3SecretManager = b.s3SecretManager; + this.updateLastAppliedIndex = b.updateLastAppliedIndex; + this.flushNotifier = b.flushNotifier; + this.unFlushedTransactions = newSemaphore(b.maxUnFlushedTransactionCount); + + this.isTracingEnabled = b.isTracingEnabled; + isRunning.set(true); // Daemon thread which runs in background and flushes transactions to DB. daemon = new Daemon(this::flushTransactions); - daemon.setName(threadPrefix + "OMDoubleBufferFlushThread"); + daemon.setName(b.threadPrefix + "OMDoubleBufferFlushThread"); daemon.start(); - this.s3SecretManager = s3SecretManager; + } + + private boolean isRatisEnabled() { + return unFlushedTransactions != null; } /** @@ -220,6 +227,7 @@ private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager, * blocking until all are available, or the thread is interrupted. */ public void acquireUnFlushedTransactions(int n) throws InterruptedException { + Preconditions.assertTrue(isRatisEnabled(), "Ratis is not enabled"); unFlushedTransactions.acquire(n); } @@ -227,7 +235,7 @@ public void acquireUnFlushedTransactions(int n) throws InterruptedException { * Releases the given number of permits, * returning them to the unFlushedTransactions. */ - public void releaseUnFlushedTransactions(int n) { + void releaseUnFlushedTransactions(int n) { unFlushedTransactions.release(n); } @@ -360,7 +368,7 @@ private void flushBatch(Queue buffer) throws IOException { // Complete futures first and then do other things. // So that handler threads will be released. - if (!isRatisEnabled) { + if (!isRatisEnabled()) { buffer.stream() .map(Entry::getResponse) .map(OMClientResponse::getFlushFuture) @@ -375,7 +383,7 @@ private void flushBatch(Queue buffer) throws IOException { // Clean up committed transactions. cleanupCache(cleanupEpochs); - if (isRatisEnabled) { + if (isRatisEnabled()) { releaseUnFlushedTransactions(flushedTransactionsSize); } // update the last updated index in OzoneManagerStateMachine. @@ -537,7 +545,7 @@ public synchronized void add(OMClientResponse response, TermIndex termIndex) { currentBuffer.add(new Entry(termIndex, response)); notify(); - if (!isRatisEnabled) { + if (!isRatisEnabled()) { response.setFlushFuture(new CompletableFuture<>()); } } @@ -639,7 +647,7 @@ private CompletableFuture await() { } private int complete() { - Preconditions.checkState(future.complete(count)); + Preconditions.assertTrue(future.complete(count)); return future.join(); } } @@ -654,7 +662,7 @@ synchronized CompletableFuture await() { final int flush = flushCount + 2; LOG.debug("await flush {}", flush); final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry()); - Preconditions.checkState(flushFutures.size() <= 2); + Preconditions.assertTrue(flushFutures.size() <= 2); return entry.await(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 62e320e1e069..90fcba40f5d0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -392,11 +392,11 @@ public CompletableFuture query(Message request) { public synchronized void pause() { LOG.info("OzoneManagerStateMachine is pausing"); statePausedCount.incrementAndGet(); - if (getLifeCycleState() == LifeCycle.State.PAUSED) { + final LifeCycle.State state = getLifeCycleState(); + if (state == LifeCycle.State.PAUSED) { return; } - final LifeCycle lc = getLifeCycle(); - if (lc.getCurrentState() != LifeCycle.State.NEW) { + if (state != LifeCycle.State.NEW) { getLifeCycle().transition(LifeCycle.State.PAUSING); getLifeCycle().transition(LifeCycle.State.PAUSED); } @@ -423,13 +423,13 @@ public synchronized void unpause(long newLastAppliedSnaphsotIndex, } public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() { - int maxUnflushedTransactionSize = ozoneManager.getConfiguration() + final int maxUnFlushedTransactionCount = ozoneManager.getConfiguration() .getInt(OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT, OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT); - return new OzoneManagerDoubleBuffer.Builder() + return OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(ozoneManager.getMetadataManager()) .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex) - .setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize) + .setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount) .setThreadPrefix(threadPrefix) .setS3SecretManager(ozoneManager.getS3SecretManager()) .enableRatis(true) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index a4ec2c2f200a..cf9bb4f0bbce 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -62,21 +62,20 @@ import org.slf4j.LoggerFactory; /** - * This class is the server-side translator that forwards requests received on - * {@link OzoneManagerProtocolPB} - * to the OzoneManagerService server implementation. + * This is the server-side translator that forwards requests received + * from {@link OzoneManagerProtocolPB} to {@link OzoneManager}. */ -public class OzoneManagerProtocolServerSideTranslatorPB implements - OzoneManagerProtocolPB { - private static final Logger LOG = LoggerFactory - .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); - private static final String OM_REQUESTS_PACKAGE = - "org.apache.hadoop.ozone"; +public class OzoneManagerProtocolServerSideTranslatorPB implements OzoneManagerProtocolPB { + private static final Logger LOG = LoggerFactory .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); + private static final String OM_REQUESTS_PACKAGE = "org.apache.hadoop.ozone"; private final OzoneManagerRatisServer omRatisServer; private final RequestHandler handler; - private final boolean isRatisEnabled; private final OzoneManager ozoneManager; + /** + * Only used to handle write requests when ratis is disabled. + * When ratis is enabled, write requests are handled by the state machine. + */ private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; private final AtomicLong transactionIndex; private final OzoneProtocolMessageDispatcher("OzoneProtocol", metrics, LOG, OMPBHelper::processForDebug, OMPBHelper::processForDebug); + // TODO: make this injectable for testing... - requestValidations = - new RequestValidations() - .fromPackage(OM_REQUESTS_PACKAGE) - .withinContext( - ValidationContext.of(ozoneManager.getVersionManager(), - ozoneManager.getMetadataManager())) - .load(); + this.requestValidations = new RequestValidations() + .fromPackage(OM_REQUESTS_PACKAGE) + .withinContext(ValidationContext.of(ozoneManager.getVersionManager(), ozoneManager.getMetadataManager())) + .load(); + } + private boolean isRatisEnabled() { + return ozoneManagerDoubleBuffer == null; } /** @@ -197,7 +188,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws } } - if (!isRatisEnabled) { + if (!isRatisEnabled()) { return submitRequestDirectlyToOM(request); } @@ -320,13 +311,7 @@ private OMResponse submitRequestDirectlyToOM(OMRequest request) { return omClientResponse.getOMResponse(); } - /** - * Create OMResponse from the specified OMRequest and exception. - * - * @param omRequest - * @param exception - * @return OMResponse - */ + /** @return an {@link OMResponse} from the given {@link OMRequest} and the given exception. */ private OMResponse createErrorResponse( OMRequest omRequest, IOException exception) { // Added all write command types here, because in future if any of the @@ -344,7 +329,7 @@ private OMResponse createErrorResponse( } public void stop() { - if (!isRatisEnabled) { + if (ozoneManagerDoubleBuffer != null) { ozoneManagerDoubleBuffer.stop(); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java index 21205c4dc334..c7d15bfe5f23 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java @@ -131,10 +131,10 @@ public void setup() throws IOException { flushNotifier = new OzoneManagerDoubleBuffer.FlushNotifier(); spyFlushNotifier = spy(flushNotifier); - doubleBuffer = new OzoneManagerDoubleBuffer.Builder() + doubleBuffer = OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(omMetadataManager) .setS3SecretManager(secretManager) - .setmaxUnFlushedTransactionCount(1000) + .setMaxUnFlushedTransactionCount(1000) .enableRatis(true) .setFlushNotifier(spyFlushNotifier) .build(); @@ -289,7 +289,7 @@ public void testAwaitFlush() throws Exception { doubleBuffer.getCurrentBufferSize()); // Start double buffer and wait for flush. - final Future await = awaitFlush(); + final Future await = doubleBuffer.awaitFlushAsync(); Future flusher = flushTransactions(executorService); await.get(); @@ -302,7 +302,7 @@ public void testAwaitFlush() throws Exception { assertEquals(0, doubleBuffer.getReadyBufferSize()); // Run again to make sure it works when double buffer is empty - awaitFlush().get(); + doubleBuffer.awaitFlushAsync().get(); // Clean up. flusher.cancel(false); @@ -323,8 +323,7 @@ public void testS3SecretCacheSizePostDoubleBufferFlush() throws IOException { "RULE:[2:$1@$0](.*@EXAMPLE.COM)s/@.*//\n" + "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\n" + "DEFAULT"); - UserGroupInformation ugiAlice; - ugiAlice = UserGroupInformation.createRemoteUser(userPrincipalId1); + final UserGroupInformation ugiAlice = UserGroupInformation.createRemoteUser(userPrincipalId1); UserGroupInformation.createRemoteUser(userPrincipalId2); UserGroupInformation.createRemoteUser(userPrincipalId3); assertEquals("alice", ugiAlice.getShortUserName()); @@ -393,11 +392,6 @@ private OzoneManagerProtocolProtos.OMRequest s3GetSecretRequest( ).build(); } - // Return a future that waits for the flush. - private Future awaitFlush() { - return doubleBuffer.awaitFlushAsync(); - } - private Future flushTransactions(ExecutorService executorService) { return executorService.submit(() -> { doubleBuffer.resume(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java index ee2e9043a362..dd8e642721e6 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java @@ -75,9 +75,9 @@ public void setup() throws IOException { folder.toAbsolutePath().toString()); omMetadataManager = new OmMetadataManagerImpl(configuration, null); - doubleBuffer = new OzoneManagerDoubleBuffer.Builder() + doubleBuffer = OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(omMetadataManager) - .setmaxUnFlushedTransactionCount(10000) + .setMaxUnFlushedTransactionCount(10000) .enableRatis(true) .build(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java index 006777141a6c..e8674b616377 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java @@ -107,9 +107,9 @@ public void setup() throws IOException { auditLogger = mock(AuditLogger.class); when(ozoneManager.getAuditLogger()).thenReturn(auditLogger); doNothing().when(auditLogger).logWrite(any(AuditMessage.class)); - doubleBuffer = new OzoneManagerDoubleBuffer.Builder() + doubleBuffer = OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(omMetadataManager) - .setmaxUnFlushedTransactionCount(100000) + .setMaxUnFlushedTransactionCount(100000) .enableRatis(true) .build(); }