diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java index 0291b556c6e0..d02319a4cab6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsSnapshot.java @@ -47,7 +47,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.om.KeyManagerImpl; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmConfig; import org.apache.hadoop.ozone.om.OzoneManager; @@ -93,7 +92,7 @@ static void initClass() throws Exception { // Enable filesystem snapshot feature for the test regardless of the default conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); - conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE); + conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, -1); conf.setInt(OmConfig.Keys.SERVER_LIST_MAX_SIZE, 20); conf.setInt(OZONE_FS_LISTING_PAGE_SIZE, 30); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index fb8d6b30146e..c29f7edbf597 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -412,7 +412,7 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100"); followerOM.getConfiguration().setInt( OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, - KeyManagerImpl.DISABLE_VALUE); + -1); // Start the inactive OM. Checkpoint installation will happen spontaneously. cluster.startInactiveOM(followerNodeId); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java index 366f61990f4c..038fc491fe71 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java @@ -227,7 +227,7 @@ private void init() throws Exception { conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY, OMLayoutFeature.BUCKET_LAYOUT_SUPPORT.layoutVersion()); conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); - conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE); + conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, -1); if (!disableNativeDiff) { conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 0, TimeUnit.SECONDS); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java index 2711982661d2..7f6543f2c89e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java @@ -17,21 +17,31 @@ package org.apache.hadoop.ozone.reconfig; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.common.collect.ImmutableSet; import java.util.Set; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.hdds.conf.ReconfigurationHandler; +import org.apache.hadoop.ozone.om.KeyManagerImpl; import org.apache.hadoop.ozone.om.OmConfig; +import org.apache.hadoop.ozone.om.OzoneManager; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -55,6 +65,7 @@ void reconfigurableProperties() { .add(OZONE_READONLY_ADMINISTRATORS) .add(OZONE_DIR_DELETING_SERVICE_INTERVAL) .add(OZONE_THREAD_NUMBER_DIR_DELETION) + .add(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL) .addAll(new OmConfig().reconfigurableProperties()) .build(); @@ -125,4 +136,56 @@ void unsetAllowListAllVolumes(String newValue) throws ReconfigurationException { assertEquals(OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT, cluster().getOzoneManager().getAllowListAllVolumes()); } + @Test + void sstFilteringServiceInterval() throws ReconfigurationException { + // Tests reconfiguration of SST filtering service interval + final OzoneManager om = cluster().getOzoneManager(); + final KeyManagerImpl keyManagerImpl = (KeyManagerImpl) om.getKeyManager(); + + // Get the original interval value + String originalValue = om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL); + // Verify the original value is valid (should be larger than -1) + long originalInterval = om.getConfiguration().getTimeDuration( + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT, + MILLISECONDS); + assertThat(originalInterval).isPositive(); + + // 1. Test reconfiguring to a different valid interval (30 seconds) + // This should restart the SstFilteringService + final String newIntervalValue = "30s"; + getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, newIntervalValue); + assertEquals(newIntervalValue, om.getConfiguration() + .get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL)); + // Verify the service is still enabled with the new interval + assertTrue(keyManagerImpl.isSstFilteringSvcEnabled(), + "SstFilteringService should remain enabled with new interval"); + assertNotNull(keyManagerImpl.getSnapshotSstFilteringService(), + "SstFilteringService should not be null with new interval"); + // Verify the new interval is applied (30 seconds = 30000 milliseconds) + long newInterval = om.getConfiguration().getTimeDuration( + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT, + MILLISECONDS); + assertEquals(30000, newInterval, "New interval should be 30 seconds (30000ms)"); + + // 2. Service should stop when interval is reconfigured to -1 + final String disableValue = String.valueOf(-1); + getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, disableValue); + assertEquals(disableValue, om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL)); + // Verify that the SstFilteringService is stopped + assertFalse(keyManagerImpl.isSstFilteringSvcEnabled(), + "SstFilteringService should be disabled when interval is -1"); + assertNull(keyManagerImpl.getSnapshotSstFilteringService(), + "SstFilteringService should be null when disabled"); + + // Set the interval back to the original value + // Service should be started again when reconfigured to a valid value + getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, originalValue); + assertEquals(originalValue, om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL)); + // Verify that the SstFilteringService is running again + assertTrue(keyManagerImpl.isSstFilteringSvcEnabled(), + "SstFilteringService should be enabled after restoring original interval"); + assertNotNull(keyManagerImpl.getSnapshotSstFilteringService(), + "SstFilteringService should not be null when enabled"); + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index bd186df16843..ca6913edbbb3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -186,7 +186,6 @@ public class KeyManagerImpl implements KeyManager { private static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class); - public static final int DISABLE_VALUE = -1; /** * A SCM block client, used to talk to SCM to allocate block during putKey. @@ -214,7 +213,7 @@ public class KeyManagerImpl implements KeyManager { public KeyManagerImpl(OzoneManager om, ScmClient scmClient, OzoneConfiguration conf, OMPerformanceMetrics metrics) { - this (om, scmClient, om.getMetadataManager(), conf, + this(om, scmClient, om.getMetadataManager(), conf, om.getBlockTokenMgr(), om.getKmsProvider(), metrics); } @@ -306,21 +305,7 @@ public void start(OzoneConfiguration configuration) { if (snapshotSstFilteringService == null && ozoneManager.isFilesystemSnapshotEnabled()) { - - long serviceInterval = configuration.getTimeDuration( - OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, - OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - long serviceTimeout = configuration.getTimeDuration( - OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT, - OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - if (isSstFilteringSvcEnabled()) { - snapshotSstFilteringService = - new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS, - serviceTimeout, ozoneManager, configuration); - snapshotSstFilteringService.start(); - } + startSnapshotSstFilteringService(configuration); } if (snapshotDeletingService == null && @@ -370,6 +355,42 @@ public void start(OzoneConfiguration configuration) { : new CachedDNSToSwitchMapping(newInstance)); } + /** + * Start the snapshot SST filtering service if interval is not set to disabled value. + * @param conf + */ + public void startSnapshotSstFilteringService(OzoneConfiguration conf) { + if (isSstFilteringSvcEnabled()) { + long serviceInterval = conf.getTimeDuration( + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT, + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + snapshotSstFilteringService = + new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS, + serviceTimeout, ozoneManager, conf); + snapshotSstFilteringService.start(); + } else { + LOG.info("SstFilteringService is disabled."); + } + } + + /** + * Stop the snapshot SST filtering service if it is running. + */ + public void stopSnapshotSstFilteringService() { + if (snapshotSstFilteringService != null) { + snapshotSstFilteringService.shutdown(); + snapshotSstFilteringService = null; + } else { + LOG.info("SstFilteringService is already stopped or not started."); + } + } + private void startCompactionService(OzoneConfiguration configuration, boolean isCompactionServiceEnabled) { if (compactionService == null && isCompactionServiceEnabled) { @@ -963,7 +984,8 @@ public boolean isSstFilteringSvcEnabled() { .getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - return serviceInterval != DISABLE_VALUE; + // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay + return serviceInterval > 0; } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index a5df1337dc0d..a302a01cbfcc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -83,6 +83,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED; @@ -524,6 +525,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) .register(OZONE_READONLY_ADMINISTRATORS, this::reconfOzoneReadOnlyAdmins) .register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes) + .register(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, + this::reconfOzoneSnapshotSSTFilteringServiceInterval) .register(OZONE_KEY_DELETING_LIMIT_PER_TASK, this::reconfOzoneKeyDeletingLimitPerTask) .register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval) @@ -5206,6 +5209,26 @@ private String reconfOzoneThreadNumberDirDeletion(String newVal) { return newVal; } + private String reconfOzoneSnapshotSSTFilteringServiceInterval(String newVal) { + boolean wasSstFilteringSvcEnabled = ((KeyManagerImpl) keyManager).isSstFilteringSvcEnabled(); + getConfiguration().set(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, newVal); + + if (this.isFilesystemSnapshotEnabled()) { + if (wasSstFilteringSvcEnabled) { + // Sanity check + Preconditions.checkNotNull(keyManager.getSnapshotSstFilteringService(), + "sstFilteringService should not be null when SST filtering service is enabled."); + ((KeyManagerImpl) keyManager).stopSnapshotSstFilteringService(); + } + // Note startSnapshotSstFilteringService checks whether the config is set to a value that enables the service + ((KeyManagerImpl) keyManager).startSnapshotSstFilteringService(getConfiguration()); + } else { + LOG.warn("Ozone filesystem snapshot is not enabled. {} is reconfigured but will not make any difference.", + OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL); + } + return newVal; + } + public void validateReplicationConfig(ReplicationConfig replicationConfig) throws OMException { try {