Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for extracting files from a TAR archive using a multi-threaded approach.
*
* <p>This class utilizes an {@link ExecutorService} to extract files concurrently,
* improving performance when handling large TAR archives.</p>
*
* <p>Usage:</p>
* <pre>
* {@code
* TarExtractor tarExtractor = new TarExtractor(10, "extractor-thread");
* tarExtractor.extractTar(inputStream, outputPath);
* tarExtractor.shutdown();
* }
* </pre>
*/
public class TarExtractor {
private static final Logger LOG =
LoggerFactory.getLogger(TarExtractor.class);

private final ExecutorService executor;
private ThreadFactory threadFactory;

public TarExtractor(int threadPoolSize, String threadNamePrefix) {
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix)
.build();
this.executor =
new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
}

public void extractTar(InputStream tarStream, Path outputDir)
throws IOException, InterruptedException, ExecutionException {
List<Future<Void>> futures = new ArrayList<>();

try (TarArchiveInputStream tarInput = new TarArchiveInputStream(tarStream)) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory()) {
byte[] fileData = readEntryData(tarInput, entry.getSize());

// Submit extraction as a task
TarArchiveEntry finalEntry = entry;
futures.add(executor.submit(() -> {
writeFile(outputDir, finalEntry.getName(), fileData);
return null;
}));
} else {
File dir = new File(outputDir.toFile(), entry.getName());
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Failed to create directory: " + dir);
}
}
}
}

// Wait for all tasks to complete
for (Future<Void> future : futures) {
future.get();
}
}

private byte[] readEntryData(TarArchiveInputStream tarInput, long size) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] fileData = new byte[(int) size];
int bytesRead;
while (size > 0 && (bytesRead = tarInput.read(fileData, 0, (int) Math.min(fileData.length, size))) != -1) {
buffer.write(fileData, 0, bytesRead);
size -= bytesRead;
}
return buffer.toByteArray();
}

private void writeFile(Path outputDir, String fileName, byte[] fileData) {
try {
File outputFile = new File(outputDir.toFile(), fileName);
Path parentDir = outputFile.toPath().getParent();
if (parentDir != null && !Files.exists(parentDir)) {
Files.createDirectories(parentDir);
}

try (InputStream fis = new ByteArrayInputStream(fileData);
OutputStream fos = Files.newOutputStream(outputFile.toPath())) {
byte[] buffer = new byte[8192]; // Use a buffer for efficient reading/writing
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
}
} catch (IOException e) {
throw new RuntimeException("Error writing file: " + fileName, e);
}
}

public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("Tar Extractor Executor Service did not terminate in time.");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted during shutdown. Forcing shutdown of Tar Extractor Executor Service...");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -87,6 +86,7 @@
import org.apache.hadoop.ozone.recon.ReconContext;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.TarExtractor;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
Expand All @@ -113,6 +113,8 @@ public class OzoneManagerServiceProviderImpl
LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
private URLConnectionFactory connectionFactory;

private int omDBTarProcessorThreadCount; // Number of parallel workers

private File omSnapshotDBParentDir = null;
private File reconDbDir = null;
private String omDBSnapshotUrl;
Expand All @@ -134,6 +136,7 @@ public class OzoneManagerServiceProviderImpl
private ThreadFactory threadFactory;
private ReconContext reconContext;
private ReconTaskStatusUpdaterManager taskStatusUpdaterManager;
private TarExtractor tarExtractor;

/**
* OM Snapshot related task names.
Expand Down Expand Up @@ -220,10 +223,12 @@ public OzoneManagerServiceProviderImpl(
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d")
.build();
this.omDBTarProcessorThreadCount = Math.max(64, Runtime.getRuntime().availableProcessors());
this.reconContext = reconContext;
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD,
RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT);
this.tarExtractor = new TarExtractor(omDBTarProcessorThreadCount, threadNamePrefix);
}

@Override
Expand Down Expand Up @@ -393,28 +398,27 @@ private boolean isOmSpnegoEnabled() {
* @return DBCheckpoint instance.
*/
@VisibleForTesting
DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" +
System.currentTimeMillis();
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
".tar");
public DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System.currentTimeMillis();
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName);

try {
SecurityUtil.doAsLoginUser(() -> {
try (InputStream inputStream = reconUtils.makeHttpCall(
connectionFactory, getOzoneManagerSnapshotUrl(),
isOmSpnegoEnabled()).getInputStream()) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
connectionFactory, getOzoneManagerSnapshotUrl(), isOmSpnegoEnabled()).getInputStream()) {
tarExtractor.extractTar(inputStream, untarredDbDir);
} catch (IOException | InterruptedException e) {
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
throw new RuntimeException("Error while extracting OM DB Snapshot TAR.", e);
}
return null;
});
// Untar the checkpoint file.
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName);
reconUtils.untarCheckpointFile(targetFile, untarredDbDir);

// Validate the presence of required SST files
// Validate extracted files
File[] sstFiles = untarredDbDir.toFile().listFiles((dir, name) -> name.endsWith(".sst"));
if (sstFiles == null || sstFiles.length == 0) {
LOG.warn("No SST files found in the OM snapshot directory: {}", untarredDbDir);
if (sstFiles != null && sstFiles.length > 0) {
LOG.warn("Number of SST files found in the OM snapshot directory: {} - {}", untarredDbDir, sstFiles.length);
}

List<String> sstFileNames = Arrays.stream(sstFiles)
Expand All @@ -426,14 +430,17 @@ connectionFactory, getOzoneManagerSnapshotUrl(),
// RocksDB.
reconContext.updateHealthStatus(new AtomicBoolean(true));
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);

return new RocksDBCheckpoint(untarredDbDir);

} catch (IOException e) {
LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
LOG.error("Unable to obtain Ozone Manager DB Snapshot.", e);
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
} finally {
FileUtils.deleteQuietly(targetFile);
tarExtractor.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tarExtractor should be shut down in stop(), not after getting a single DB snapshot, if you want to use it more than once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adoroszlai for your review. Good catch. Fixed and pushed the change.

}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
Expand Down Expand Up @@ -179,7 +180,11 @@ public void testUpdateReconOmDBWithNewSnapshotFailure(
reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol,
reconContext, getMockTaskStatusUpdaterManager());

assertFalse(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot());
Exception exception = assertThrows(RuntimeException.class, () -> {
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
});

assertTrue(exception.getCause() instanceof IOException);

// Verifying if context error GET_OM_DB_SNAPSHOT_FAILED is added
assertTrue(reconContext.getErrors().contains(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED));
Expand Down