diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index 6927d77c0331..3b19e42d3935 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -71,6 +71,8 @@ private ReconConstants() { public static final String CONTAINER_COUNT = "CONTAINER_COUNT"; public static final String TOTAL_KEYS = "TOTAL_KEYS"; public static final String TOTAL_USED_BYTES = "TOTAL_USED_BYTES"; + public static final String STAGING = ".staging_"; + // 1125899906842624L = 1PB public static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index ba2ebe0a7791..fa8b908b2330 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -434,6 +434,7 @@ public HttpURLConnection makeHttpCall(URLConnectionFactory connectionFactory, */ public File getLastKnownDB(File reconDbDir, String fileNamePrefix) { String lastKnownSnapshotFileName = null; + File lastKnownSnapshotFile = null; long lastKnonwnSnapshotTs = Long.MIN_VALUE; if (reconDbDir != null) { File[] snapshotFiles = reconDbDir.listFiles((dir, name) -> @@ -448,8 +449,17 @@ public File getLastKnownDB(File reconDbDir, String fileNamePrefix) { } long snapshotTimestamp = Long.parseLong(fileNameSplits[1]); if (lastKnonwnSnapshotTs < snapshotTimestamp) { + if (lastKnownSnapshotFile != null) { + try { + FileUtils.deleteDirectory(lastKnownSnapshotFile); + } catch (IOException e) { + log.warn("Error deleting existing om db snapshot directory: {}", + lastKnownSnapshotFile.getAbsolutePath()); + } + } lastKnonwnSnapshotTs = snapshotTimestamp; lastKnownSnapshotFileName = fileName; + lastKnownSnapshotFile = snapshotFile; } } catch (NumberFormatException nfEx) { log.warn("Unknown file found in Recon DB dir : {}", fileName); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java new file mode 100644 index 000000000000..b3bd17bdece4 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon; + +import static org.apache.hadoop.ozone.recon.ReconConstants.STAGING; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for extracting files from a TAR archive using a multi-threaded approach. + * + *

This class utilizes an {@link ExecutorService} to extract files concurrently, + * improving performance when handling large TAR archives.

+ * + *

Usage:

+ *
+ * {@code
+ * TarExtractor tarExtractor = new TarExtractor(10, "extractor-thread");
+ * tarExtractor.extractTar(inputStream, outputPath);
+ * tarExtractor.shutdown();
+ * }
+ * 
+ */ +public class TarExtractor { + private static final Logger LOG = + LoggerFactory.getLogger(TarExtractor.class); + + private final AtomicBoolean executorServiceStarted = new AtomicBoolean(false); + private int threadPoolSize; + private ExecutorService executor; + private ThreadFactory threadFactory; + + public TarExtractor(int threadPoolSize, String threadNamePrefix) { + this.threadPoolSize = threadPoolSize; + this.threadFactory = + new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix) + .build(); + } + + public void extractTar(InputStream tarStream, Path outputDir) + throws IOException, InterruptedException, ExecutionException { + String stagingDirName = STAGING + UUID.randomUUID(); + Path parentDir = outputDir.getParent(); + if (parentDir == null) { + parentDir = outputDir; // Handle null parent case + } + Path stagingDir = parentDir.resolve(stagingDirName); + + Files.createDirectories(stagingDir); // Ensure staging directory exists + + List> futures = new ArrayList<>(); + + try (TarArchiveInputStream tarInput = new TarArchiveInputStream(tarStream)) { + TarArchiveEntry entry; + while ((entry = tarInput.getNextTarEntry()) != null) { + if (!entry.isDirectory()) { + byte[] fileData = readEntryData(tarInput, entry.getSize()); + + // Submit extraction as a task + TarArchiveEntry finalEntry = entry; + futures.add(executor.submit(() -> { + writeFile(stagingDir, finalEntry.getName(), fileData); + return null; + })); + } else { + File dir = new File(stagingDir.toFile(), entry.getName()); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Failed to create directory: " + dir); + } + } + } + } + + // Wait for all tasks to complete + for (Future future : futures) { + future.get(); + } + + // Move staging to outputDir atomically + if (Files.exists(outputDir)) { + FileUtils.deleteDirectory(outputDir.toFile()); // Clean old data + } + try { + Files.move(stagingDir, outputDir, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.warn("Atomic move of staging dir : {} to {} failed.", stagingDir, outputDir, e); + } + + LOG.info("Tar extraction completed and moved from staging to: {}", outputDir); + } + + private byte[] readEntryData(TarArchiveInputStream tarInput, long size) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + byte[] fileData = new byte[(int) size]; + int bytesRead; + while (size > 0 && (bytesRead = tarInput.read(fileData, 0, (int) Math.min(fileData.length, size))) != -1) { + buffer.write(fileData, 0, bytesRead); + size -= bytesRead; + } + return buffer.toByteArray(); + } + + private void writeFile(Path outputDir, String fileName, byte[] fileData) { + try { + File outputFile = new File(outputDir.toFile(), fileName); + Path parentDir = outputFile.toPath().getParent(); + if (parentDir != null && !Files.exists(parentDir)) { + Files.createDirectories(parentDir); + } + + try (InputStream fis = new ByteArrayInputStream(fileData); + OutputStream fos = Files.newOutputStream(outputFile.toPath())) { + byte[] buffer = new byte[8192]; // Use a buffer for efficient reading/writing + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + } + } catch (IOException e) { + throw new RuntimeException("Error writing file: " + fileName, e); + } + } + + public void start() { + if (executorServiceStarted.compareAndSet(false, true)) { + this.executor = + new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); + } + } + + public void stop() { + if (executorServiceStarted.compareAndSet(true, false)) { + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("Tar Extractor Executor Service did not terminate in time."); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted during shutdown. Forcing shutdown of Tar Extractor Executor Service..."); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } +} + diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 31ec5f353a5a..d774b21296e7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE; import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB; +import static org.apache.hadoop.ozone.recon.ReconConstants.STAGING; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT; @@ -87,6 +88,7 @@ import org.apache.hadoop.ozone.recon.ReconContext; import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.TarExtractor; import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; @@ -113,6 +115,8 @@ public class OzoneManagerServiceProviderImpl LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class); private URLConnectionFactory connectionFactory; + private int omDBTarProcessorThreadCount; // Number of parallel workers + private File omSnapshotDBParentDir = null; private File reconDbDir = null; private String omDBSnapshotUrl; @@ -134,6 +138,7 @@ public class OzoneManagerServiceProviderImpl private ThreadFactory threadFactory; private ReconContext reconContext; private ReconTaskStatusUpdaterManager taskStatusUpdaterManager; + private TarExtractor tarExtractor; /** * OM Snapshot related task names. @@ -220,10 +225,12 @@ public OzoneManagerServiceProviderImpl( this.threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d") .build(); + this.omDBTarProcessorThreadCount = Math.max(64, Runtime.getRuntime().availableProcessors()); this.reconContext = reconContext; this.taskStatusUpdaterManager = taskStatusUpdaterManager; this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT); + this.tarExtractor = new TarExtractor(omDBTarProcessorThreadCount, threadNamePrefix); } @Override @@ -236,6 +243,7 @@ public void start() { LOG.info("Starting Ozone Manager Service Provider."); scheduler = Executors.newScheduledThreadPool(1, threadFactory); try { + tarExtractor.start(); omMetadataManager.start(configuration); } catch (IOException ioEx) { LOG.error("Error starting Recon OM Metadata Manager.", ioEx); @@ -330,6 +338,7 @@ private void startSyncDataFromOM(long initialDelay) { private void stopSyncDataFromOMThread() { scheduler.shutdownNow(); + tarExtractor.stop(); LOG.debug("Shutdown the OM DB sync scheduler."); } @@ -341,6 +350,7 @@ public boolean triggerSyncDataFromOMImmediately() { // immediately. stopSyncDataFromOMThread(); scheduler = Executors.newScheduledThreadPool(1, threadFactory); + tarExtractor.start(); startSyncDataFromOM(0L); return true; } else { @@ -355,6 +365,7 @@ public void stop() throws Exception { reconTaskController.stop(); omMetadataManager.stop(); scheduler.shutdownNow(); + tarExtractor.stop(); metrics.unRegister(); connectionFactory.destroy(); } @@ -393,28 +404,58 @@ private boolean isOmSpnegoEnabled() { * @return DBCheckpoint instance. */ @VisibleForTesting - DBCheckpoint getOzoneManagerDBSnapshot() { - String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + - System.currentTimeMillis(); - File targetFile = new File(omSnapshotDBParentDir, snapshotFileName + - ".tar"); + public DBCheckpoint getOzoneManagerDBSnapshot() { + String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System.currentTimeMillis(); + Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName); + + // Before fetching full snapshot again and create a new OM DB snapshot directory, check and delete + // any existing OM DB snapshot directories under recon om db dir location and delete all such + // om db snapshot dirs including the last known om db snapshot dir returned by reconUtils.getLastKnownDB + File lastKnownDB = reconUtils.getLastKnownDB(omSnapshotDBParentDir, RECON_OM_SNAPSHOT_DB); + if (lastKnownDB != null) { + boolean existingOmSnapshotDBDeleted = FileUtils.deleteQuietly(lastKnownDB); + if (existingOmSnapshotDBDeleted) { + LOG.info("Successfully deleted existing OM DB snapshot directory: {}", + lastKnownDB.getAbsolutePath()); + } else { + LOG.warn("Failed to delete existing OM DB snapshot directory: {}", + lastKnownDB.getAbsolutePath()); + } + } + + // Now below cleanup operation will even remove any left over staging dirs in recon om db dir location which + // may be left due to any previous partial extraction of tar entries and during copy sst files process by + // tarExtractor.extractTar + File[] leftOverStagingDirs = omSnapshotDBParentDir.listFiles(f -> f.getName().startsWith(STAGING)); + if (leftOverStagingDirs != null) { + for (File stagingDir : leftOverStagingDirs) { + LOG.warn("Cleaning up leftover staging folder from failed extraction: {}", stagingDir.getAbsolutePath()); + boolean stagingDirDeleted = FileUtils.deleteQuietly(stagingDir); + if (stagingDirDeleted) { + LOG.info("Successfully deleted leftover staging folder: {}", stagingDir.getAbsolutePath()); + } else { + LOG.warn("Failed to delete leftover staging folder: {}", stagingDir.getAbsolutePath()); + } + } + } + try { SecurityUtil.doAsLoginUser(() -> { try (InputStream inputStream = reconUtils.makeHttpCall( - connectionFactory, getOzoneManagerSnapshotUrl(), - isOmSpnegoEnabled()).getInputStream()) { - FileUtils.copyInputStreamToFile(inputStream, targetFile); + connectionFactory, getOzoneManagerSnapshotUrl(), isOmSpnegoEnabled()).getInputStream()) { + tarExtractor.extractTar(inputStream, untarredDbDir); + } catch (IOException | InterruptedException e) { + reconContext.updateHealthStatus(new AtomicBoolean(false)); + reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED); + throw new RuntimeException("Error while extracting OM DB Snapshot TAR.", e); } return null; }); - // Untar the checkpoint file. - Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), snapshotFileName); - reconUtils.untarCheckpointFile(targetFile, untarredDbDir); - // Validate the presence of required SST files + // Validate extracted files File[] sstFiles = untarredDbDir.toFile().listFiles((dir, name) -> name.endsWith(".sst")); - if (sstFiles == null || sstFiles.length == 0) { - LOG.warn("No SST files found in the OM snapshot directory: {}", untarredDbDir); + if (sstFiles != null && sstFiles.length > 0) { + LOG.info("Number of SST files found in the OM snapshot directory: {} - {}", untarredDbDir, sstFiles.length); } List sstFileNames = Arrays.stream(sstFiles) @@ -428,11 +469,9 @@ connectionFactory, getOzoneManagerSnapshotUrl(), reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED); return new RocksDBCheckpoint(untarredDbDir); } catch (IOException e) { - LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e); + LOG.error("Unable to obtain Ozone Manager DB Snapshot.", e); reconContext.updateHealthStatus(new AtomicBoolean(false)); reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED); - } finally { - FileUtils.deleteQuietly(targetFile); } return null; } @@ -784,5 +823,10 @@ public long getCurrentOMDBSequenceNumber() { public OzoneManagerSyncMetrics getMetrics() { return metrics; } + + @VisibleForTesting + public TarExtractor getTarExtractor() { + return tarExtractor; + } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 6eef1d4c1e46..085f9c12cea1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; @@ -142,6 +143,7 @@ public void testUpdateReconOmDBWithNewSnapshot( assertNull(reconOMMetadataManager.getKeyTable(getBucketLayout()) .get("/sampleVol/bucketOne/key_two")); + ozoneManagerServiceProvider.getTarExtractor().start(); assertTrue(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot()); assertNotNull(reconOMMetadataManager.getKeyTable(getBucketLayout()) @@ -179,7 +181,11 @@ public void testUpdateReconOmDBWithNewSnapshotFailure( reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol, reconContext, getMockTaskStatusUpdaterManager()); - assertFalse(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot()); + Exception exception = assertThrows(RuntimeException.class, () -> { + ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); + }); + + assertTrue(exception.getCause() instanceof IOException); // Verifying if context error GET_OM_DB_SNAPSHOT_FAILED is added assertTrue(reconContext.getErrors().contains(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED)); @@ -217,6 +223,7 @@ public void testUpdateReconOmDBWithNewSnapshotSuccess( reconContext, getMockTaskStatusUpdaterManager()); assertTrue(reconContext.getErrors().contains(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED)); + ozoneManagerServiceProvider.getTarExtractor().start(); assertTrue(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot()); assertFalse(reconContext.getErrors().contains(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED)); @@ -258,6 +265,7 @@ public void testReconOmDBCloseAndOpenNewSnapshotDb( new OzoneManagerServiceProviderImpl(configuration, reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol, reconContext, getMockTaskStatusUpdaterManager()); + ozoneManagerServiceProvider1.getTarExtractor().start(); assertTrue(ozoneManagerServiceProvider1.updateReconOmDBWithNewSnapshot()); } @@ -270,6 +278,7 @@ public void testReconOmDBCloseAndOpenNewSnapshotDb( new OzoneManagerServiceProviderImpl(configuration, reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol, reconContext, getMockTaskStatusUpdaterManager()); + ozoneManagerServiceProvider2.getTarExtractor().start(); assertTrue(ozoneManagerServiceProvider2.updateReconOmDBWithNewSnapshot()); } } @@ -309,6 +318,7 @@ public void testGetOzoneManagerDBSnapshot(@TempDir File dirReconMetadata) reconOMMetadataManager, reconTaskController, reconUtilsMock, ozoneManagerProtocol, reconContext, getMockTaskStatusUpdaterManager()); + ozoneManagerServiceProvider.getTarExtractor().start(); DBCheckpoint checkpoint = ozoneManagerServiceProvider .getOzoneManagerDBSnapshot(); assertNotNull(checkpoint);