Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private ReconConstants() {
public static final String CONTAINER_COUNT = "CONTAINER_COUNT";
public static final String TOTAL_KEYS = "TOTAL_KEYS";
public static final String TOTAL_USED_BYTES = "TOTAL_USED_BYTES";
public static final String STAGING = ".staging_";


// 1125899906842624L = 1PB
public static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ public HttpURLConnection makeHttpCall(URLConnectionFactory connectionFactory,
*/
public File getLastKnownDB(File reconDbDir, String fileNamePrefix) {
String lastKnownSnapshotFileName = null;
File lastKnownSnapshotFile = null;
long lastKnonwnSnapshotTs = Long.MIN_VALUE;
if (reconDbDir != null) {
File[] snapshotFiles = reconDbDir.listFiles((dir, name) ->
Expand All @@ -447,8 +448,17 @@ public File getLastKnownDB(File reconDbDir, String fileNamePrefix) {
}
long snapshotTimestamp = Long.parseLong(fileNameSplits[1]);
if (lastKnonwnSnapshotTs < snapshotTimestamp) {
if (lastKnownSnapshotFile != null) {
try {
FileUtils.deleteDirectory(lastKnownSnapshotFile);
} catch (IOException e) {
log.warn("Error deleting existing om db snapshot directory: {}",
lastKnownSnapshotFile.getAbsolutePath());
}
}
lastKnonwnSnapshotTs = snapshotTimestamp;
lastKnownSnapshotFileName = fileName;
lastKnownSnapshotFile = snapshotFile;
}
} catch (NumberFormatException nfEx) {
log.warn("Unknown file found in Recon DB dir : {}", fileName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon;

import static org.apache.hadoop.ozone.recon.ReconConstants.STAGING;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for extracting files from a TAR archive using a multi-threaded approach.
*
* <p>This class utilizes an {@link ExecutorService} to extract files concurrently,
* improving performance when handling large TAR archives.</p>
*
* <p>Usage:</p>
* <pre>
* {@code
* TarExtractor tarExtractor = new TarExtractor(10, "extractor-thread");
* tarExtractor.extractTar(inputStream, outputPath);
* tarExtractor.shutdown();
* }
* </pre>
*/
public class TarExtractor {
private static final Logger LOG =
LoggerFactory.getLogger(TarExtractor.class);

private int threadPoolSize;
private ExecutorService executor;
private ThreadFactory threadFactory;

public TarExtractor(int threadPoolSize, String threadNamePrefix) {
this.threadPoolSize = threadPoolSize;
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix)
.build();
start();
}

public void extractTar(InputStream tarStream, Path outputDir)
throws IOException, InterruptedException, ExecutionException {
String stagingDirName = STAGING + UUID.randomUUID();
Path parentDir = outputDir.getParent();
if (parentDir == null) {
parentDir = outputDir; // Handle null parent case
}
Path stagingDir = parentDir.resolve(stagingDirName);

Files.createDirectories(stagingDir); // Ensure staging directory exists

List<Future<Void>> futures = new ArrayList<>();

try (TarArchiveInputStream tarInput = new TarArchiveInputStream(tarStream)) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory()) {
byte[] fileData = readEntryData(tarInput, entry.getSize());

// Submit extraction as a task
TarArchiveEntry finalEntry = entry;
futures.add(executor.submit(() -> {
writeFile(stagingDir, finalEntry.getName(), fileData);
return null;
}));
} else {
File dir = new File(stagingDir.toFile(), entry.getName());
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Failed to create directory: " + dir);
}
}
}
}

// Wait for all tasks to complete
for (Future<Void> future : futures) {
future.get();
}

// Move staging to outputDir atomically
if (Files.exists(outputDir)) {
FileUtils.deleteDirectory(outputDir.toFile()); // Clean old data
}
try {
Files.move(stagingDir, outputDir, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
LOG.warn("Atomic move of staging dir : {} to {} failed.", stagingDir, outputDir, e);
}

LOG.info("Tar extraction completed and moved from staging to: {}", outputDir);
}

private byte[] readEntryData(TarArchiveInputStream tarInput, long size) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] fileData = new byte[(int) size];
int bytesRead;
while (size > 0 && (bytesRead = tarInput.read(fileData, 0, (int) Math.min(fileData.length, size))) != -1) {
buffer.write(fileData, 0, bytesRead);
size -= bytesRead;
}
return buffer.toByteArray();
}

private void writeFile(Path outputDir, String fileName, byte[] fileData) {
try {
File outputFile = new File(outputDir.toFile(), fileName);
Path parentDir = outputFile.toPath().getParent();
if (parentDir != null && !Files.exists(parentDir)) {
Files.createDirectories(parentDir);
}

try (InputStream fis = new ByteArrayInputStream(fileData);
OutputStream fos = Files.newOutputStream(outputFile.toPath())) {
byte[] buffer = new byte[8192]; // Use a buffer for efficient reading/writing
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
}
} catch (IOException e) {
throw new RuntimeException("Error writing file: " + fileName, e);
}
}

public void start() {
this.executor =
new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
}

public void stop() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("Tar Extractor Executor Service did not terminate in time.");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted during shutdown. Forcing shutdown of Tar Extractor Executor Service...");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB;
import static org.apache.hadoop.ozone.recon.ReconConstants.STAGING;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.hadoop.ozone.recon.ReconContext;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.TarExtractor;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
Expand All @@ -113,6 +115,8 @@ public class OzoneManagerServiceProviderImpl
LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
private URLConnectionFactory connectionFactory;

private int omDBTarProcessorThreadCount; // Number of parallel workers

private File omSnapshotDBParentDir = null;
private File reconDbDir = null;
private String omDBSnapshotUrl;
Expand All @@ -134,6 +138,7 @@ public class OzoneManagerServiceProviderImpl
private ThreadFactory threadFactory;
private ReconContext reconContext;
private ReconTaskStatusUpdaterManager taskStatusUpdaterManager;
private TarExtractor tarExtractor;

/**
* OM Snapshot related task names.
Expand Down Expand Up @@ -220,10 +225,12 @@ public OzoneManagerServiceProviderImpl(
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d")
.build();
this.omDBTarProcessorThreadCount = Math.max(64, Runtime.getRuntime().availableProcessors());
this.reconContext = reconContext;
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD,
RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT);
this.tarExtractor = new TarExtractor(omDBTarProcessorThreadCount, threadNamePrefix);
}

@Override
Expand Down Expand Up @@ -311,6 +318,7 @@ private void startSyncDataFromOM(long initialDelay) {
OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT),
TimeUnit.MILLISECONDS);
LOG.debug("Started the OM DB sync scheduler.");
tarExtractor.start();
scheduler.scheduleWithFixedDelay(() -> {
try {
LOG.info("Last known sequence number before sync: {}", getCurrentOMDBSequenceNumber());
Expand All @@ -330,6 +338,7 @@ private void startSyncDataFromOM(long initialDelay) {

private void stopSyncDataFromOMThread() {
scheduler.shutdownNow();
tarExtractor.stop();
LOG.debug("Shutdown the OM DB sync scheduler.");
}

Expand All @@ -355,6 +364,7 @@ public void stop() throws Exception {
reconTaskController.stop();
omMetadataManager.stop();
scheduler.shutdownNow();
tarExtractor.stop();
metrics.unRegister();
connectionFactory.destroy();
}
Expand Down Expand Up @@ -393,28 +403,46 @@ private boolean isOmSpnegoEnabled() {
* @return DBCheckpoint instance.
*/
@VisibleForTesting
DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" +
System.currentTimeMillis();
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
".tar");
public DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System.currentTimeMillis();
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName);

// Before fetching full snapshot again and create a new OM DB snapshot directory, check and delete
// any existing OM DB snapshot directories under recon om db dir location and delete all such
// om db snapshot dirs including the last known om db snapshot dir returned by reconUtils.getLastKnownDB
File lastKnownDB = reconUtils.getLastKnownDB(omSnapshotDBParentDir, RECON_OM_SNAPSHOT_DB);
if (lastKnownDB != null) {
FileUtils.deleteQuietly(lastKnownDB);
}

// Now below cleanup operation will even remove any left over staging dirs in recon om db dir location which
// may be left due to any previous partial extraction of tar entries and during copy sst files process by
// tarExtractor.extractTar
File[] leftOverStagingDirs = omSnapshotDBParentDir.listFiles(f -> f.getName().startsWith(STAGING));
if (leftOverStagingDirs != null) {
for (File stagingDir : leftOverStagingDirs) {
LOG.warn("Cleaning up leftover staging folder from failed extraction: {}", stagingDir.getAbsolutePath());
FileUtils.deleteQuietly(stagingDir);
}
}

try {
SecurityUtil.doAsLoginUser(() -> {
try (InputStream inputStream = reconUtils.makeHttpCall(
connectionFactory, getOzoneManagerSnapshotUrl(),
isOmSpnegoEnabled()).getInputStream()) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
connectionFactory, getOzoneManagerSnapshotUrl(), isOmSpnegoEnabled()).getInputStream()) {
tarExtractor.extractTar(inputStream, untarredDbDir);
} catch (IOException | InterruptedException e) {
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
throw new RuntimeException("Error while extracting OM DB Snapshot TAR.", e);
}
return null;
});
// Untar the checkpoint file.
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName);
reconUtils.untarCheckpointFile(targetFile, untarredDbDir);

// Validate the presence of required SST files
// Validate extracted files
File[] sstFiles = untarredDbDir.toFile().listFiles((dir, name) -> name.endsWith(".sst"));
if (sstFiles == null || sstFiles.length == 0) {
LOG.warn("No SST files found in the OM snapshot directory: {}", untarredDbDir);
if (sstFiles != null && sstFiles.length > 0) {
LOG.info("Number of SST files found in the OM snapshot directory: {} - {}", untarredDbDir, sstFiles.length);
}

List<String> sstFileNames = Arrays.stream(sstFiles)
Expand All @@ -426,14 +454,15 @@ connectionFactory, getOzoneManagerSnapshotUrl(),
// RocksDB.
reconContext.updateHealthStatus(new AtomicBoolean(true));
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);

return new RocksDBCheckpoint(untarredDbDir);

} catch (IOException e) {
LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
LOG.error("Unable to obtain Ozone Manager DB Snapshot.", e);
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
} finally {
FileUtils.deleteQuietly(targetFile);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
Expand Down Expand Up @@ -179,7 +180,11 @@ public void testUpdateReconOmDBWithNewSnapshotFailure(
reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol,
reconContext, getMockTaskStatusUpdaterManager());

assertFalse(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot());
Exception exception = assertThrows(RuntimeException.class, () -> {
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
});

assertTrue(exception.getCause() instanceof IOException);

// Verifying if context error GET_OM_DB_SNAPSHOT_FAILED is added
assertTrue(reconContext.getErrors().contains(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED));
Expand Down
Loading