Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -93,7 +92,7 @@ static void initClass() throws Exception {
// Enable filesystem snapshot feature for the test regardless of the default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE);
conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, -1);
conf.setInt(OmConfig.Keys.SERVER_LIST_MAX_SIZE, 20);
conf.setInt(OZONE_FS_LISTING_PAGE_SIZE, 30);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100");
followerOM.getConfiguration().setInt(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
KeyManagerImpl.DISABLE_VALUE);
-1);
// Start the inactive OM. Checkpoint installation will happen spontaneously.
cluster.startInactiveOM(followerNodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void init() throws Exception {
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY, OMLayoutFeature.BUCKET_LAYOUT_SUPPORT.layoutVersion());
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, KeyManagerImpl.DISABLE_VALUE);
conf.setInt(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, -1);
if (!disableNativeDiff) {
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 0, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,31 @@

package org.apache.hadoop.ozone.reconfig;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -55,6 +65,7 @@ void reconfigurableProperties() {
.add(OZONE_READONLY_ADMINISTRATORS)
.add(OZONE_DIR_DELETING_SERVICE_INTERVAL)
.add(OZONE_THREAD_NUMBER_DIR_DELETION)
.add(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL)
.addAll(new OmConfig().reconfigurableProperties())
.build();

Expand Down Expand Up @@ -125,4 +136,56 @@ void unsetAllowListAllVolumes(String newValue) throws ReconfigurationException {
assertEquals(OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT, cluster().getOzoneManager().getAllowListAllVolumes());
}

@Test
void sstFilteringServiceInterval() throws ReconfigurationException {
// Tests reconfiguration of SST filtering service interval
final OzoneManager om = cluster().getOzoneManager();
final KeyManagerImpl keyManagerImpl = (KeyManagerImpl) om.getKeyManager();

// Get the original interval value
String originalValue = om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL);
// Verify the original value is valid (should be larger than -1)
long originalInterval = om.getConfiguration().getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
MILLISECONDS);
assertThat(originalInterval).isPositive();

// 1. Test reconfiguring to a different valid interval (30 seconds)
// This should restart the SstFilteringService
final String newIntervalValue = "30s";
getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, newIntervalValue);
assertEquals(newIntervalValue, om.getConfiguration()
.get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL));
// Verify the service is still enabled with the new interval
assertTrue(keyManagerImpl.isSstFilteringSvcEnabled(),
"SstFilteringService should remain enabled with new interval");
assertNotNull(keyManagerImpl.getSnapshotSstFilteringService(),
"SstFilteringService should not be null with new interval");
// Verify the new interval is applied (30 seconds = 30000 milliseconds)
long newInterval = om.getConfiguration().getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
MILLISECONDS);
assertEquals(30000, newInterval, "New interval should be 30 seconds (30000ms)");

// 2. Service should stop when interval is reconfigured to -1
final String disableValue = String.valueOf(-1);
getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, disableValue);
assertEquals(disableValue, om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL));
// Verify that the SstFilteringService is stopped
assertFalse(keyManagerImpl.isSstFilteringSvcEnabled(),
"SstFilteringService should be disabled when interval is -1");
assertNull(keyManagerImpl.getSnapshotSstFilteringService(),
"SstFilteringService should be null when disabled");

// Set the interval back to the original value
// Service should be started again when reconfigured to a valid value
getSubject().reconfigurePropertyImpl(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, originalValue);
assertEquals(originalValue, om.getConfiguration().get(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL));
// Verify that the SstFilteringService is running again
assertTrue(keyManagerImpl.isSstFilteringSvcEnabled(),
"SstFilteringService should be enabled after restoring original interval");
assertNotNull(keyManagerImpl.getSnapshotSstFilteringService(),
"SstFilteringService should not be null when enabled");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@
public class KeyManagerImpl implements KeyManager {
private static final Logger LOG =
LoggerFactory.getLogger(KeyManagerImpl.class);
public static final int DISABLE_VALUE = -1;

/**
* A SCM block client, used to talk to SCM to allocate block during putKey.
Expand Down Expand Up @@ -214,7 +213,7 @@ public class KeyManagerImpl implements KeyManager {

public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
OzoneConfiguration conf, OMPerformanceMetrics metrics) {
this (om, scmClient, om.getMetadataManager(), conf,
this(om, scmClient, om.getMetadataManager(), conf,
om.getBlockTokenMgr(), om.getKmsProvider(), metrics);
}

Expand Down Expand Up @@ -306,21 +305,7 @@ public void start(OzoneConfiguration configuration) {

if (snapshotSstFilteringService == null &&
ozoneManager.isFilesystemSnapshotEnabled()) {

long serviceInterval = configuration.getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long serviceTimeout = configuration.getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT,
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (isSstFilteringSvcEnabled()) {
snapshotSstFilteringService =
new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS,
serviceTimeout, ozoneManager, configuration);
snapshotSstFilteringService.start();
}
startSnapshotSstFilteringService(configuration);
}

if (snapshotDeletingService == null &&
Expand Down Expand Up @@ -370,6 +355,42 @@ public void start(OzoneConfiguration configuration) {
: new CachedDNSToSwitchMapping(newInstance));
}

/**
* Start the snapshot SST filtering service if interval is not set to disabled value.
* @param conf
*/
public void startSnapshotSstFilteringService(OzoneConfiguration conf) {
if (isSstFilteringSvcEnabled()) {
long serviceInterval = conf.getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long serviceTimeout = conf.getTimeDuration(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT,
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);

snapshotSstFilteringService =
new SstFilteringService(serviceInterval, TimeUnit.MILLISECONDS,
serviceTimeout, ozoneManager, conf);
snapshotSstFilteringService.start();
} else {
LOG.info("SstFilteringService is disabled.");
}
}

/**
* Stop the snapshot SST filtering service if it is running.
*/
public void stopSnapshotSstFilteringService() {
if (snapshotSstFilteringService != null) {
snapshotSstFilteringService.shutdown();
snapshotSstFilteringService = null;
} else {
LOG.info("SstFilteringService is already stopped or not started.");
}
}

private void startCompactionService(OzoneConfiguration configuration,
boolean isCompactionServiceEnabled) {
if (compactionService == null && isCompactionServiceEnabled) {
Expand Down Expand Up @@ -963,7 +984,8 @@ public boolean isSstFilteringSvcEnabled() {
.getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
return serviceInterval != DISABLE_VALUE;
// any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay
return serviceInterval > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
Expand Down Expand Up @@ -524,6 +525,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
.register(OZONE_READONLY_ADMINISTRATORS,
this::reconfOzoneReadOnlyAdmins)
.register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes)
.register(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
this::reconfOzoneSnapshotSSTFilteringServiceInterval)
.register(OZONE_KEY_DELETING_LIMIT_PER_TASK,
this::reconfOzoneKeyDeletingLimitPerTask)
.register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval)
Expand Down Expand Up @@ -5206,6 +5209,26 @@ private String reconfOzoneThreadNumberDirDeletion(String newVal) {
return newVal;
}

private String reconfOzoneSnapshotSSTFilteringServiceInterval(String newVal) {
boolean wasSstFilteringSvcEnabled = ((KeyManagerImpl) keyManager).isSstFilteringSvcEnabled();
getConfiguration().set(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, newVal);

if (this.isFilesystemSnapshotEnabled()) {
if (wasSstFilteringSvcEnabled) {
// Sanity check
Preconditions.checkNotNull(keyManager.getSnapshotSstFilteringService(),
"sstFilteringService should not be null when SST filtering service is enabled.");
((KeyManagerImpl) keyManager).stopSnapshotSstFilteringService();
}
// Note startSnapshotSstFilteringService checks whether the config is set to a value that enables the service
((KeyManagerImpl) keyManager).startSnapshotSstFilteringService(getConfiguration());
} else {
LOG.warn("Ozone filesystem snapshot is not enabled. {} is reconfigured but will not make any difference.",
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL);
}
return newVal;
}

public void validateReplicationConfig(ReplicationConfig replicationConfig)
throws OMException {
try {
Expand Down