Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3358,7 +3358,7 @@
<value>5s</value>
<tag>OZONE, RECON, OM</tag>
<description>
Interval in MINUTES by Recon to request OM DB Snapshot.
Interval in SECONDS by Recon to request OM DB Snapshot.
</description>
</property>
<property>
Expand Down Expand Up @@ -3391,12 +3391,13 @@
</description>
</property>
<property>
<name>recon.om.delta.update.loop.limit</name>
<value>50</value>
<name>recon.om.delta.update.lag.threshold</name>
<value>0</value>
<tag>OZONE, RECON</tag>
<description>
The sync between Recon and OM consists of several small
fetch loops.
At every Recon OM sync, recon starts fetching OM DB updates, and it continues to
fetch from OM till the lag, between OM DB WAL sequence number and Recon OM DB
snapshot WAL sequence number, is less than this lag threshold value.
</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ Validate All Tasks Updated After Sync

Validate Sequence number is updated after sync
Log To Console Triggering OM DB sync for updates
Log To Console Wait for few seconds to let previous OM DB Sync thread to finish
Sleep 2s # Waits for 2 seconds
Sync OM Data
${tasks} = Fetch Task Status
Should Not Be Empty ${tasks}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -107,8 +106,7 @@ public static void init() throws Exception {
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS
);
conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2);
conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10);
conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10);

RequestConfig config = RequestConfig.custom()
.setConnectTimeout(socketTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ public final class ReconServerConfigKeys {
public static final String RECON_OM_DELTA_UPDATE_LIMIT =
"recon.om.delta.update.limit";
public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 50000;
public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT =
"recon.om.delta.update.loop.limit";
public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT = 50;

public static final String RECON_OM_DELTA_UPDATE_LAG_THRESHOLD =
"recon.om.delta.update.lag.threshold";
public static final long RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT = 0;

public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down Expand Up @@ -92,10 +93,10 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconUtils.convertNumericToSymbolic;
import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER;

Expand Down Expand Up @@ -127,8 +128,8 @@ public class OzoneManagerServiceProviderImpl
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;

private long deltaUpdateLimit;
private int deltaUpdateLoopLimit;
private final long deltaUpdateLimit;
private final long omDBLagThreshold;

private AtomicBoolean isSyncDataFromOMRunning;
private final String threadNamePrefix;
Expand Down Expand Up @@ -181,9 +182,6 @@ public OzoneManagerServiceProviderImpl(

long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT,
RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT);
int deltaUpdateLoopLimits = configuration.getInt(
RECON_OM_DELTA_UPDATE_LOOP_LIMIT,
RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT);

omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
Expand Down Expand Up @@ -218,7 +216,6 @@ public OzoneManagerServiceProviderImpl(
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
this.deltaUpdateLimit = deltaUpdateLimits;
this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
this.isSyncDataFromOMRunning = new AtomicBoolean();
this.threadNamePrefix =
reconUtils.getReconNodeDetails(configuration).threadNamePrefix();
Expand All @@ -227,6 +224,8 @@ public OzoneManagerServiceProviderImpl(
.build();
this.reconContext = reconContext;
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD,
RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT);
}

@Override
Expand Down Expand Up @@ -256,7 +255,7 @@ public void start() {
LOG.error("Failed fetching a full snapshot from Ozone Manager");
}
} catch (IOException e) {
LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot: {}", e);
LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot", e);
throw new RuntimeException(runtimeException);
}
}
Expand Down Expand Up @@ -311,7 +310,7 @@ public boolean triggerSyncDataFromOMImmediately() {
startSyncDataFromOM(0L);
return true;
} else {
LOG.debug("OM DB sync is already running.");
LOG.info("OM DB sync is already running when trying to trigger OM DB sync manually.");
}
return false;
}
Expand Down Expand Up @@ -444,54 +443,46 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException {
/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
*
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
* @throws IOException when OM RPC request fails.
* @return lag count which tells how much Recon OM DB snapshot is lagging from OM DB.
* @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
void getAndApplyDeltaUpdatesFromOM(
Long getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int loopCount = 0;
LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
long deltaUpdateCnt = Long.MAX_VALUE;
long inLoopStartSequenceNumber = fromSequenceNumber;
long inLoopLatestSequenceNumber;
while (loopCount < deltaUpdateLoopLimit &&
deltaUpdateCnt >= deltaUpdateLimit) {
if (!innerGetAndApplyDeltaUpdatesFromOM(
inLoopStartSequenceNumber, omdbUpdatesHandler)) {
LOG.error(
"Retrieve OM DB delta update failed for sequence number : {}, " +
"so falling back to full snapshot.", inLoopStartSequenceNumber);
throw new RocksDBException(
"Unable to get delta updates since sequenceNumber - " +
inLoopStartSequenceNumber);
}
inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
loopCount++;
ImmutablePair<Boolean, Long> dbUpdatesLatestSeqNumOfOMDB =
innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, omdbUpdatesHandler);
if (!dbUpdatesLatestSeqNumOfOMDB.getLeft()) {
LOG.error(
"Retrieve OM DB delta update failed for sequence number : {}, " +
"so falling back to full snapshot.", fromSequenceNumber);
throw new RocksDBException(
"Unable to get delta updates since sequenceNumber - " +
fromSequenceNumber);
}

omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber());
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
getCurrentOMDBSequenceNumber() - fromSequenceNumber
);
LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber);
return dbUpdatesLatestSeqNumOfOMDB.getRight();
}

/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
*
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
* @throws IOException when OM RPC request fails.
* @return Pair of dbUpdatesSuccess, lag (lag between OM and Recom)
* @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
ImmutablePair<Boolean, Long> innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber)
Expand Down Expand Up @@ -528,7 +519,7 @@ boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
"isDBUpdateSuccess: {}", numUpdates, getCurrentOMDBSequenceNumber()
- fromSequenceNumber, lag,
null != dbUpdates && dbUpdates.isDBUpdateSuccess());
return null != dbUpdates && dbUpdates.isDBUpdateSuccess();
return new ImmutablePair<>(null != dbUpdates && dbUpdates.isDBUpdateSuccess(), lag);
}

/**
Expand Down Expand Up @@ -572,15 +563,31 @@ public boolean syncDataFromOM() {

// Get updates from OM and apply to local Recon OM DB and update task status in table
reconTaskUpdater.recordRunStart();
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
omdbUpdatesHandler);

reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.recordRunCompletion();
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
int loopCount = 0;
long fromSequenceNumber = currentSequenceNumber;
long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
/**
* This loop will continue to fetch and apply OM DB updates and with every
* OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates.
* It continues to fetch from OM till the lag, between OM DB WAL sequence number
* and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value.
* In high OM write TPS cluster, this simulates continuous pull from OM without any delay.
*/
while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
diffBetweenOMDbAndReconDBSeqNumber =
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler);
reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.recordRunCompletion();
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
currentSequenceNumber = getCurrentOMDBSequenceNumber();
LOG.debug("Updated current sequence number: {}", currentSequenceNumber);
loopCount++;
}
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
getCurrentOMDBSequenceNumber() - fromSequenceNumber);
} catch (InterruptedException intEx) {
LOG.error("OM DB Delta update sync thread was interrupted.");
// We are updating the table even if it didn't run i.e. got interrupted beforehand
Expand All @@ -594,8 +601,7 @@ public boolean syncDataFromOM() {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM.",
e.getMessage());
LOG.warn("Unable to get and apply delta updates from OM: {}", e.getMessage());
fullSnapshot = true;
}
}
Expand Down Expand Up @@ -659,7 +665,7 @@ public boolean syncDataFromOM() {
isSyncDataFromOMRunning.set(false);
}
} else {
LOG.info("OM DB sync is already running.");
LOG.info("OM DB sync is already running in syncDataFromOM.");
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
Expand Down Expand Up @@ -71,7 +71,6 @@
import org.apache.hadoop.ozone.recon.common.CommonUtils;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;

Expand Down Expand Up @@ -363,17 +362,19 @@ public void testGetAndApplyDeltaUpdatesFromOM(
OMMetadataManager omMetadataManager =
initializeNewOmMetadataManager(dirOmMetadata);

OzoneConfiguration withLimitConfiguration =
new OzoneConfiguration(configuration);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata),
getMockTaskController(), new ReconUtils(), getMockOzoneManagerClient(dbUpdatesWrapper),
reconContext, getMockTaskStatusUpdaterManager());

OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);
long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber();
dbUpdatesWrapper.setLatestSequenceNumber(currentReconDBSequenceNumber + 4);

ozoneManagerServiceProvider.syncDataFromOM();
OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(4.0,
metrics.getAverageNumUpdatesInDeltaRequest(), 0.0);
Expand All @@ -383,7 +384,7 @@ public void testGetAndApplyDeltaUpdatesFromOM(

// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
assertEquals(4, updatesHandler.getEvents().size());
assertEquals(currentReconDBSequenceNumber + 4, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber());

// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the changes.
Expand Down Expand Up @@ -430,8 +431,8 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(

OzoneConfiguration withLimitConfiguration =
new OzoneConfiguration(configuration);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 3);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, 1);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(withLimitConfiguration,
getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata),
Expand All @@ -440,15 +441,17 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(
dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]),
reconContext, getMockTaskStatusUpdaterManager());

long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber();
dbUpdatesWrapper[0].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
dbUpdatesWrapper[1].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
dbUpdatesWrapper[2].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);

assertTrue(dbUpdatesWrapper[0].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[1].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[2].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[3].isDBUpdateSuccess());

OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);
ozoneManagerServiceProvider.syncDataFromOM();

OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(1.0,
Expand All @@ -459,7 +462,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(

// Assert GET path --> verify if the OMDBUpdatesHandler picked up the first
// 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
assertEquals(3, updatesHandler.getEvents().size());
assertEquals(currentReconDBSequenceNumber + 3, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber());

// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the first 3 changes, last change not applied.
Expand Down