diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 822cce29adb7..b3b453e7c49c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -277,6 +277,12 @@ public final class OzoneConfigKeys { OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s"; // 300s for default + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT = + "ozone.snapshot.defrag.service.timeout"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s"; + // 300s for default + public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL = "ozone.snapshot.deleting.service.interval"; public static final String diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 1aab4fc28dd5..47357493a272 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -208,10 +208,11 @@ public final class OzoneConsts { public static final String OM_SLD_VERSION = "version"; public static final String OM_SLD_CHECKSUM = "checksum"; public static final String OM_SLD_IS_SST_FILTERED = "isSSTFiltered"; - public static final String OM_SLD_UNCOMPACTED_SST_FILE_LIST = "uncompactedSSTFileList"; - public static final String OM_SLD_LAST_COMPACTION_TIME = "lastCompactionTime"; - public static final String OM_SLD_NEEDS_COMPACTION = "needsCompaction"; - public static final String OM_SLD_COMPACTED_SST_FILE_LIST = "compactedSSTFileList"; + public static final String OM_SLD_NOT_DEFRAGGED_SST_FILE_LIST = "notDefraggedSSTFileList"; + public static final String OM_SLD_LAST_DEFRAG_TIME = "lastDefragTime"; + public static final String OM_SLD_NEEDS_DEFRAGMENTATION = "needsDefragmentation"; + public static final String OM_SLD_DEFRAGGED_SST_FILE_LIST = "defraggedSSTFileList"; + public static final String OM_SLD_DB_CHECKPOINT_DIR = "dbCheckpointDir"; // YAML fields for .container files public static final String CONTAINER_ID = "containerID"; @@ -509,6 +510,7 @@ public final class OzoneConsts { public static final String OM_SNAPSHOT_DIR = "db.snapshots"; public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "checkpointState"; + public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGED_DIR = "checkpointStateDefragged"; public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "diffState"; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java index a561023f5300..11fbe72d4b97 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java @@ -216,6 +216,9 @@ public RDBStore build() throws RocksDatabaseException { ManagedWriteOptions writeOptions = new ManagedWriteOptions(); writeOptions.setSync(rocksDBConfiguration.getSyncOption()); + // Disable WAL to reduce disk write, for testing only + writeOptions.setDisableWAL(true); + File dbFile = getDBFile(); if (!dbFile.getParentFile().exists()) { throw new RocksDatabaseException("The DB destination directory should exist."); @@ -399,6 +402,9 @@ private ManagedDBOptions getDefaultDBOptions( // RocksDB log settings. dbOptions.setMaxLogFileSize(rocksDBConfiguration.getMaxLogFileSize()); dbOptions.setKeepLogFileNum(rocksDBConfiguration.getKeepLogFileNum()); + // Disable WAL archival (for more accurate RocksDB size measurements when there aren't too many keys) + rocksDBConfiguration.setWalTTL(0); + rocksDBConfiguration.setWalSizeLimit(0); // Apply WAL settings. dbOptions.setWalTtlSeconds(rocksDBConfiguration.getWalTTL()); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java index e84854cae443..5aa561ba9486 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java @@ -28,7 +28,7 @@ /** * DumpFileWriter using rocksdb sst files. */ -class RDBSstFileWriter implements Closeable { +public class RDBSstFileWriter implements Closeable { private ManagedSstFileWriter sstFileWriter; private File sstFile; @@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable { private ManagedOptions emptyOption = new ManagedOptions(); private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions(); - RDBSstFileWriter(File externalFile) throws RocksDatabaseException { + public RDBSstFileWriter(File externalFile) throws RocksDatabaseException { this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions, emptyOption); this.keyCounter = new AtomicLong(0); this.sstFile = externalFile; @@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException { } } + public void delete(byte[] key) throws RocksDatabaseException { + try { + sstFileWriter.delete(key); + keyCounter.incrementAndGet(); + } catch (RocksDBException e) { + closeOnFailure(); + throw new RocksDatabaseException("Failed to delete key (length=" + key.length + + "), sstFile=" + sstFile.getAbsolutePath(), e); + } + } + @Override public void close() throws RocksDatabaseException { if (sstFileWriter != null) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e3853a84211c..c005cc8080ed 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -139,6 +139,7 @@ public class RDBStore implements DBStore { checkpointsParentDir = checkpointsParentDirPath.toString(); Files.createDirectories(checkpointsParentDirPath); + // TODO: scm, dn, etc. don't need this. Only om does Path snapshotsParentDirPath = Paths.get(dbLocation.getParent(), OM_SNAPSHOT_CHECKPOINT_DIR); snapshotsParentDir = snapshotsParentDirPath.toString(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index cf0c84f375e3..9d12f54f9f55 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -237,7 +237,7 @@ public boolean isClosed() { * * @see ManagedCheckpoint */ - final class RocksCheckpoint implements Closeable { + public final class RocksCheckpoint implements Closeable { private final ManagedCheckpoint checkpoint; private RocksCheckpoint() { @@ -599,7 +599,7 @@ public List getLiveFilesMetaData() throws RocksDatabaseExcepti } } - RocksCheckpoint createCheckpoint() { + public RocksCheckpoint createCheckpoint() { return new RocksCheckpoint(); } @@ -650,7 +650,7 @@ public Collection getExtraColumnFamilies() { return Collections.unmodifiableCollection(columnFamilies.values()); } - byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { + public byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { return db.get().get(family.getHandle(), key); } catch (RocksDBException e) { diff --git a/hadoop-hdds/rocks-native/pom.xml b/hadoop-hdds/rocks-native/pom.xml index 0c7e8fa7e2da..40132eb6aa63 100644 --- a/hadoop-hdds/rocks-native/pom.xml +++ b/hadoop-hdds/rocks-native/pom.xml @@ -326,10 +326,6 @@ **/*.class - **/lib*.dylib - **/lib*.so - **/lib*.jnilib - **/lib*.dll diff --git a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java index 39bb0b3ca56b..70cdda2d40c0 100644 --- a/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java +++ b/hadoop-hdds/rocks-native/src/main/java/org/apache/hadoop/hdds/utils/NativeLibraryLoader.java @@ -125,7 +125,8 @@ public synchronized boolean loadLibrary(final String libraryName, final List, List> files = copyResourceFromJarToTemp(libraryName, dependentFiles); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java index d536b81be140..2cbf32f45b2a 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java @@ -59,7 +59,8 @@ UpdateRangerSubcommand.class, TransferOmLeaderSubCommand.class, FetchKeySubCommand.class, - LeaseSubCommand.class + LeaseSubCommand.class, + TriggerSnapshotDefragSubCommand.class }) @MetaInfServices(AdminSubcommand.class) public class OMAdmin implements AdminSubcommand { diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/TriggerSnapshotDefragSubCommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/TriggerSnapshotDefragSubCommand.java new file mode 100644 index 000000000000..e2946011290c --- /dev/null +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/om/TriggerSnapshotDefragSubCommand.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.admin.om; + +import java.util.concurrent.Callable; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import picocli.CommandLine; + +/** + * Handler of ozone admin om triggerSnapshotDefrag command. + */ +@CommandLine.Command( + name = "triggerSnapshotDefrag", + description = "Triggers the Snapshot Defragmentation Service to run " + + "immediately. This command manually initiates the snapshot " + + "defragmentation process which compacts snapshot data and removes " + + "fragmentation to improve storage efficiency.", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class +) +public class TriggerSnapshotDefragSubCommand implements Callable { + + @CommandLine.ParentCommand + private OMAdmin parent; + + @CommandLine.Option( + names = {"-id", "--service-id"}, + description = "Ozone Manager Service ID" + ) + private String omServiceId; + + @CommandLine.Option( + names = {"-host", "--service-host"}, + description = "Ozone Manager Host" + ) + private String omHost; + + @CommandLine.Option( + names = {"--no-wait"}, + description = "Do not wait for the defragmentation task to complete. " + + "The command will return immediately after triggering the task.", + defaultValue = "false" + ) + private boolean noWait; + + @Override + public Void call() throws Exception { + boolean forceHA = false; + + try (OzoneManagerProtocol client = parent.createOmClient(omServiceId, omHost, forceHA)) { + System.out.println("Triggering Snapshot Defragmentation Service..."); + boolean result = client.triggerSnapshotDefrag(noWait); + + if (noWait) { + System.out.println("Snapshot defragmentation task has been triggered " + + "successfully and is running in the background."); + } else { + if (result) { + System.out.println("Snapshot defragmentation completed successfully."); + } else { + System.out.println("Snapshot defragmentation task failed or was " + + "interrupted."); + } + } + } catch (Exception e) { + System.err.println("Failed to trigger snapshot defragmentation: " + + e.getMessage()); + throw e; + } + + return null; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 81588381769c..d92881579e1a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -267,6 +267,7 @@ public static boolean isReadOnly( // write to OM DB. And therefore it doesn't need a OMClientRequest. // Although indirectly the Ranger sync service task could invoke write // operation SetRangerServiceVersion. + case SnapshotDefrag: case GetKeyInfo: case SnapshotDiff: case CancelSnapshotDiff: diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 2bfe30efab0f..2dc1408668a7 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -426,11 +426,21 @@ public final class OMConfigKeys { "ozone.snapshot.deleting.limit.per.task"; public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10; + // Snapshot defragmentation service configuration + public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK = + "ozone.snapshot.defrag.limit.per.task"; + public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1; + public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL = "ozone.snapshot.filtering.service.interval"; public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s"; + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL = + "ozone.snapshot.defrag.service.interval"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "60s"; + public static final String OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT = "ozone.om.snapshot.checkpoint.dir.creation.poll.timeout"; @@ -558,13 +568,12 @@ public final class OMConfigKeys { public static final String OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE = "ozone.om.snapshot.diff.max.page.size"; + public static final int OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE_DEFAULT = 1000; public static final String OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES = "ozone.om.snapshot.db.max.open.files"; public static final int OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT = 100; - public static final int OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE_DEFAULT - = 1000; public static final String OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE = "ozone.om.snapshot.diff.thread.pool.size"; diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/QuotaUtil.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/QuotaUtil.java index db546aecdc53..388ee719a2f1 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/QuotaUtil.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/QuotaUtil.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -59,6 +60,9 @@ public static long getReplicatedSize( fullStripes * rc.getParity() * rc.getEcChunkSize() + partialFirstChunk * rc.getParity(); return dataSize + replicationOverhead; + } else if (repConfig.getReplicationType() == STAND_ALONE) { + // For replication type STAND_ALONE, the replicated size is the same as data size. + return dataSize; } else { LOG.warn("Unknown replication type '{}'. Returning original data size.", repConfig.getReplicationType()); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 3bcf190662af..57ea7c76c29f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -459,6 +459,18 @@ ListOpenFilesResult listOpenFiles(String path, int maxKeys, String contToken) */ boolean triggerRangerBGSync(boolean noWait) throws IOException; + /** + * Triggers Snapshot Defragmentation Service immediately. + * + * Requires Ozone administrator privilege. + * + * @param noWait set to true if client won't wait for the result. + * @return true if noWait is true or when task completed successfully, + * false otherwise. + * @throws IOException OMException (e.g. PERMISSION_DENIED) + */ + boolean triggerSnapshotDefrag(boolean noWait) throws IOException; + /** * Initiate metadata upgrade finalization. * This method when called, initiates finalization of Ozone Manager metadata diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 671a93a486ec..c8eb46487007 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -219,6 +219,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetTimesRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDefragRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDefragResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotInfoRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignAdminRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignUserAccessIdRequest; @@ -1926,6 +1928,22 @@ public boolean triggerRangerBGSync(boolean noWait) throws IOException { return resp.getRunSuccess(); } + @Override + public boolean triggerSnapshotDefrag(boolean noWait) throws IOException { + SnapshotDefragRequest req = SnapshotDefragRequest.newBuilder() + .setNoWait(noWait) + .build(); + + OMRequest omRequest = createOMRequest(Type.SnapshotDefrag) + .setSnapshotDefragRequest(req) + .build(); + + SnapshotDefragResponse resp = handleError(submitRequest(omRequest)) + .getSnapshotDefragResponse(); + + return resp.getRunSuccess(); + } + @Override public StatusAndMessages finalizeUpgrade(String upgradeClientID) throws IOException { diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml index ecd51b458373..9b433e475ddd 100644 --- a/hadoop-ozone/dist/pom.xml +++ b/hadoop-ozone/dist/pom.xml @@ -171,6 +171,25 @@ + + org.apache.maven.plugins + maven-antrun-plugin + + + + run + + prepare-package + + + + + + + + + + org.apache.maven.plugins maven-resources-plugin diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone/docker-compose.yaml index dee24e570021..7131b9b5217f 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-compose.yaml +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-compose.yaml @@ -18,8 +18,6 @@ x-common-config: &common-config image: ${OZONE_RUNNER_IMAGE}:${OZONE_RUNNER_VERSION} - volumes: - - ../..:/opt/hadoop env_file: - docker-config @@ -36,6 +34,9 @@ services: environment: <<: *replication OZONE_OPTS: + volumes: + - ../..:/opt/hadoop + - ../../../data-dn:/data command: ["ozone","datanode"] om: <<: *common-config @@ -43,6 +44,9 @@ services: ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION OZONE_OPTS: <<: *replication + volumes: + - ../..:/opt/hadoop # inherited from common-config + - ../../../data-om:/data # om-specific data volume ports: - 9874:9874 - 9862:9862 @@ -57,6 +61,9 @@ services: OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} OZONE_OPTS: <<: *replication + volumes: + - ../..:/opt/hadoop + - ../../../data-scm:/data command: ["ozone","scm"] httpfs: <<: *common-config @@ -65,7 +72,11 @@ services: <<: *replication ports: - 14000:14000 + volumes: + - ../..:/opt/hadoop command: [ "ozone","httpfs" ] + profiles: + - with-httpfs s3g: <<: *common-config environment: @@ -74,7 +85,11 @@ services: ports: - 9878:9878 - 19878:19878 + volumes: + - ../..:/opt/hadoop command: ["ozone","s3g"] + profiles: + - with-s3g recon: <<: *common-config hostname: recon @@ -83,4 +98,10 @@ services: environment: OZONE_OPTS: <<: *replication + volumes: + - ../..:/opt/hadoop command: ["ozone","recon"] + profiles: + # Do not start Recon by default to avoid OM DB being checkpointed every minute + # This service is only started when the profile is enabled + - with-recon diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index f2a9e0447932..08c3ca3a0337 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -53,6 +53,18 @@ OZONE-SITE.XML_hdds.scm.replication.over.replicated.interval=5s OZONE-SITE.XML_hdds.scm.wait.time.after.safemode.exit=30s OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http +# Increase diff page size to improve efficiency +# Note this is server side setting, client needs to specify larger page size as well, default is 1000 +OZONE-SITE.XML_ozone.om.snapshot.diff.max.page.size=10000 + +OZONE-SITE.XML_hadoop.hdds.db.rocksdb.WAL_ttl_seconds=0 +OZONE-SITE.XML_hadoop.hdds.db.rocksdb.WAL_size_limit_MB=0MB + +# Enable Ozone snapshot defragmentation service but set it to run once per day to +# effectively stop it from running periodically +OZONE-SITE.XML_ozone.snapshot.defrag.service.interval=24h +#OZONE-SITE.XML_ozone.snapshot.defrag.limit.per.task=5 + OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true OZONE-SITE.XML_ozone.fs.hsync.enabled=true diff --git a/hadoop-ozone/dist/src/shell/ozone/ozone b/hadoop-ozone/dist/src/shell/ozone/ozone index 177f28a548ab..ebd91e65746f 100755 --- a/hadoop-ozone/dist/src/shell/ozone/ozone +++ b/hadoop-ozone/dist/src/shell/ozone/ozone @@ -139,6 +139,7 @@ function ozonecmd_case OZONE_OM_OPTS="${RATIS_OPTS} ${OZONE_OM_OPTS}" OZONE_OM_OPTS="${OZONE_OM_OPTS} -Dlog4j.configurationFile=${OZONE_CONF_DIR}/om-audit-log4j2.properties" OZONE_OM_OPTS="-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector ${OZONE_OM_OPTS} ${OZONE_MODULE_ACCESS_ARGS}" + JAVA_LIBRARY_PATH="${JAVA_LIBRARY_PATH}:${OZONE_HOME}/share/ozone/lib" OZONE_RUN_ARTIFACT_NAME="ozone-manager" ;; sh | shell) @@ -220,6 +221,7 @@ function ozonecmd_case debug) OZONE_CLASSNAME=org.apache.hadoop.ozone.debug.OzoneDebug OZONE_DEBUG_OPTS="${OZONE_DEBUG_OPTS} ${RATIS_OPTS} ${OZONE_MODULE_ACCESS_ARGS}" + JAVA_LIBRARY_PATH="${JAVA_LIBRARY_PATH}:${OZONE_HOME}/share/ozone/lib" OZONE_RUN_ARTIFACT_NAME="ozone-tools" ;; repair) diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index ed636fba0cd3..0d545bc672e9 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.util.ShutdownHookManager; import org.apache.hadoop.util.Time; @@ -736,13 +737,20 @@ public void run() { } private boolean createVolume(int volumeNumber) { - String volumeName = "vol-" + volumeNumber + "-" - + RandomStringUtils.secure().nextNumeric(5); + String volumeName = "vol-" + volumeNumber; LOG.trace("Creating volume: {}", volumeName); try (AutoCloseable scope = TracingUtil .createActivatedSpan("createVolume")) { long start = System.nanoTime(); - objectStore.createVolume(volumeName); + try { + objectStore.createVolume(volumeName); + } catch (OMException e) { + if (e.getResult() == OMException.ResultCodes.VOLUME_ALREADY_EXISTS) { + LOG.warn("Volume {} already exists, continuing...", volumeName); + } else { + throw e; + } + } long volumeCreationDuration = System.nanoTime() - start; volumeCreationTime.getAndAdd(volumeCreationDuration); histograms.get(FreonOps.VOLUME_CREATE.ordinal()) @@ -767,22 +775,31 @@ private boolean createBucket(int globalBucketNumber) { LOG.error("Could not find volume {}", volumeNumber); return false; } - String bucketName = "bucket-" + bucketNumber + "-" + - RandomStringUtils.secure().nextNumeric(5); + String bucketName = "bucket-" + bucketNumber; LOG.trace("Creating bucket: {} in volume: {}", bucketName, volume.getName()); try (AutoCloseable scope = TracingUtil .createActivatedSpan("createBucket")) { long start = System.nanoTime(); - if (bucketLayout != null) { - BucketArgs bucketArgs = BucketArgs.newBuilder() - .setBucketLayout(bucketLayout) - .build(); - volume.createBucket(bucketName, bucketArgs); - } else { - volume.createBucket(bucketName); + + try { + if (bucketLayout != null) { + BucketArgs bucketArgs = BucketArgs.newBuilder() + .setBucketLayout(bucketLayout) + .build(); + volume.createBucket(bucketName, bucketArgs); + } else { + volume.createBucket(bucketName); + } + } catch (OMException e) { + if (e.getResult() == OMException.ResultCodes.BUCKET_ALREADY_EXISTS) { + LOG.warn("Bucket {} already exists, continuing...", bucketName); + } else { + throw e; + } } + long bucketCreationDuration = System.nanoTime() - start; histograms.get(FreonOps.BUCKET_CREATE.ordinal()) .update(bucketCreationDuration); @@ -809,8 +826,7 @@ private boolean createKey(long globalKeyNumber) { } String bucketName = bucket.getName(); String volumeName = bucket.getVolumeName(); - String keyName = "key-" + keyNumber + "-" - + RandomStringUtils.secure().nextNumeric(5); + String keyName = "key-" + String.format("%09d", keyNumber); LOG.trace("Adding key: {} in bucket: {} of volume: {}", keyName, bucketName, volumeName); try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index 657ece8be81b..0bddf40dbd4d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -94,18 +94,18 @@ void testMultiThread() { RandomKeyGenerator randomKeyGenerator = new RandomKeyGenerator(cluster().getConf()); CommandLine cmd = new CommandLine(randomKeyGenerator); - cmd.execute("--num-of-volumes", "10", + cmd.execute("--num-of-volumes", "1", "--num-of-buckets", "1", - "--num-of-keys", "10", + "--num-of-keys", "10000", "--num-of-threads", "10", - "--key-size", "10KB", + "--key-size", "1B", "--factor", "THREE", "--type", "RATIS" ); - assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated()); - assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); - assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); + assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + assertEquals(10000, randomKeyGenerator.getNumberOfKeysAdded()); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java new file mode 100644 index 000000000000..a927ab4668cd --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.ozone.test.LambdaTestUtils.await; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.DBProfile; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.BucketArgs; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMStorage; +import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.SnapshotDefragService; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * Test SnapshotDefragService functionality using MiniOzoneCluster. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestSnapshotDefragService2 { // TODO: Rename this to TestSnapshotDefragService + + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotDefragService2.class); + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore store; + private OzoneManager ozoneManager; + private SnapshotDefragService defragService; + + @BeforeAll + void setup() throws Exception { + // Enable debug logging for SnapshotDefragService + GenericTestUtils.setLogLevel(LoggerFactory.getLogger(SnapshotDefragService.class), Level.DEBUG); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 30, TimeUnit.SECONDS); + conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST); + conf.setBoolean(OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, BucketLayout.OBJECT_STORE.name()); + conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY, + OMLayoutFeature.SNAPSHOT_DEFRAGMENTATION.layoutVersion()); + conf.setInt(OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE, 1000); + + conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, 1000000); +// conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, 300); + conf.setInt(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, 5); + + conf.setQuietMode(false); + + // Create MiniOzoneCluster + cluster = MiniOzoneCluster.newBuilder(conf).build(); + cluster.waitForClusterToBeReady(); + + client = cluster.newClient(); + store = client.getObjectStore(); + ozoneManager = cluster.getOzoneManager(); + + // Create SnapshotDefragService for manual triggering + defragService = new SnapshotDefragService( + 1000000, // interval + TimeUnit.MILLISECONDS, + 3000000, // service timeout + ozoneManager, + conf + ); + } + + @AfterAll + public void cleanup() throws Exception { + if (defragService != null) { + defragService.shutdown(); + } + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Create keys in a bucket. + */ + private void createKeys(String volumeName, String bucketName, int keyCount) throws Exception { + OzoneVolume volume = store.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < keyCount; i++) { + String keyName = "key-" + i; + // Prepend zeros to work around SstFileWriter limitation: + // Caused by: org.rocksdb.RocksDBException: Keys must be added in strict ascending order. + // at org.rocksdb.SstFileWriter.put(Native Method) +// String keyName = "key-" + String.format("%09d", i); + + String data = RandomStringUtils.randomAlphabetic(10); + + try (OzoneOutputStream outputStream = bucket.createKey(keyName, 0, + StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), // TODO: Use RATIS ONE instead + java.util.Collections.emptyMap(), java.util.Collections.emptyMap())) { +// outputStream.write(data.getBytes()); + } + } + LOG.info("Created {} keys in bucket {}/{}", keyCount, volumeName, bucketName); + } + + /** + * Create a snapshot and wait for it to be available. + */ + private void createSnapshot(String volumeName, String bucketName, String snapshotName) + throws Exception { + // Get existing checkpoint directories before creating snapshot + Set existingCheckpoints = getExistingCheckpointDirectories(); + + store.createSnapshot(volumeName, bucketName, snapshotName); + + // Wait for snapshot to be created + GenericTestUtils.waitFor(() -> { + try { + SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName)); + return snapshotInfo != null; + } catch (IOException e) { + return false; + } + }, 1000, 30000); + + // Wait for checkpoint DB to be created + waitForCheckpointDB(snapshotName, existingCheckpoints); + + LOG.info("Created snapshot: {}", snapshotName); + } + + /** + * Get existing checkpoint directories before snapshot creation. + */ + private Set getExistingCheckpointDirectories() { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + Set existingDirs = new java.util.HashSet<>(); + if (checkpointParentDir.exists()) { + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs != null) { + for (File dir : checkpointDirs) { + existingDirs.add(dir.getName()); + } + } + } + + LOG.debug("Existing checkpoint directories: {}", existingDirs); + return existingDirs; + } + + /** + * Wait for checkpoint DB to be created under checkpointState directory. + */ + private void waitForCheckpointDB(String snapshotName, Set existingCheckpoints) throws Exception { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + LOG.info("Waiting for new checkpoint DB to be created under: {}", checkpointStateDir); + + GenericTestUtils.waitFor(() -> { + if (!checkpointParentDir.exists()) { + LOG.debug("CheckpointState directory does not exist yet: {}", checkpointStateDir); + return false; + } + + // List all directories in checkpointState + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs == null || checkpointDirs.length == 0) { + LOG.debug("No checkpoint directories found in: {}", checkpointStateDir); + return false; + } + + // Look for new checkpoint directories that weren't there before + for (File checkpointDir : checkpointDirs) { + String dirName = checkpointDir.getName(); + + // Skip if this directory existed before snapshot creation + if (existingCheckpoints.contains(dirName)) { + continue; + } + + // Check if the new directory contains database files + File[] dbFiles = checkpointDir.listFiles(); + if (dbFiles != null && dbFiles.length > 0) { + for (File dbFile : dbFiles) { + if (dbFile.isFile() && (dbFile.getName().endsWith(".sst") || + dbFile.getName().equals("CURRENT") || + dbFile.getName().startsWith("MANIFEST"))) { + LOG.info("New checkpoint DB found for snapshot {} in directory: {}", + snapshotName, checkpointDir.getAbsolutePath()); + return true; + } + } + } + + LOG.debug("New checkpoint directory found but no DB files yet: {}", checkpointDir.getAbsolutePath()); + } + + LOG.debug("Waiting for new checkpoint DB files to appear in checkpointState directories"); + return false; + }, 1000, 60000); // Wait up to 60 seconds for checkpoint DB creation + + LOG.info("Checkpoint DB created successfully for snapshot: {}", snapshotName); + } + + /** + * Lists the contents of the db.snapshots directory. + */ + private void printSnapshotDirectoryListing(String description) { + LOG.info("=== {} ===", description); + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String snapshotDir = metadataDir + "/db.snapshots"; + File snapshotsDir = new File(snapshotDir); + + if (!snapshotsDir.exists()) { + LOG.info("Snapshots directory does not exist: {}", snapshotDir); + return; + } + + try (Stream paths = Files.walk(Paths.get(snapshotDir))) { + paths.sorted() + .forEach(path -> { + File file = path.toFile(); + String relativePath = Paths.get(snapshotDir).relativize(path).toString(); + if (file.isDirectory()) { + LOG.info("Directory: {}/", relativePath.isEmpty() ? "." : relativePath); + } else { + LOG.info("File: {} (size: {} bytes)", relativePath, file.length()); + } + }); + } catch (IOException e) { + LOG.error("Error listing snapshot directory: {}", snapshotDir, e); + } + } + + /** + * Mark a snapshot as needing defragmentation by updating its YAML metadata. + */ + private void markSnapshotAsNeedingDefragmentation(SnapshotInfo snapshotInfo) throws IOException { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + File yamlFile = new File(snapshotPath + ".yaml"); + + if (yamlFile.exists()) { + OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile); + yamlData.setNeedsDefragmentation(true); + yamlData.writeToYaml(yamlFile); + LOG.info("Marked snapshot {} as needing defragmentation", snapshotInfo.getName()); + } else { + LOG.warn("YAML file not found for snapshot {}: {}", snapshotInfo.getName(), yamlFile.getPath()); + } + } + + /** + * Trigger the SnapshotDefragService by starting it and waiting for it to process snapshots. + */ + private void triggerSnapshotDefragService() throws Exception { + LOG.info("Triggering SnapshotDefragService ..."); + + // Mark all snapshots as needing defragmentation first + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + try (TableIterator> iterator = + metadataManager.getSnapshotInfoTable().iterator()) { + iterator.seekToFirst(); + while (iterator.hasNext()) { + Table.KeyValue entry = iterator.next(); + SnapshotInfo snapshotInfo = entry.getValue(); + markSnapshotAsNeedingDefragmentation(snapshotInfo); + } + } + + long initialDefragCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Initial defragmented count: {}", initialDefragCount); + + // Start the service + defragService.resume(); // HACK + defragService.triggerSnapshotDefragOnce(); + + // Wait for the service to process snapshots + try { + await(30000, 1000, () -> { + long currentCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Current defragmented count: {}", currentCount); + return currentCount > initialDefragCount; + }); + } catch (TimeoutException e) { + LOG.warn("Timeout waiting for defragmentation to complete, continuing with test"); + } + + LOG.info("SnapshotDefragService execution completed. Snapshots defragmented: {}", + defragService.getSnapshotsDefraggedCount().get()); + } + + @Test + public void testSnapshotDefragmentation() throws Exception { + String volumeName = "test-volume"; + String bucketName = "test-bucket"; + + // Create volume and bucket + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + // TODO: Test FSO bucket as well, default is LEGACY / OBJECT_STORE + BucketArgs.Builder bb = new BucketArgs.Builder() + .setStorageType(StorageType.SSD) + .setVersioning(false) + .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED); + volume.createBucket(bucketName, bb.build()); + + LOG.info("Starting snapshot defragmentation test..."); + + // Print initial state + printSnapshotDirectoryListing("Initial state - no snapshots"); + + // Step 1: Create 2 keys, then create snap-1 + createKeys(volumeName, bucketName, 11); + createSnapshot(volumeName, bucketName, "snap-1"); + printSnapshotDirectoryListing("After creating snap-1"); + + // Step 2: Create 2 more keys, then create snap-2 + createKeys(volumeName, bucketName, 11); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-2"); + printSnapshotDirectoryListing("After creating snap-2"); + + // Step 3: Create 2 more keys, then create snap-3 + createKeys(volumeName, bucketName, 11); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-3"); + printSnapshotDirectoryListing("After creating snap-3"); + + // Step 4: Trigger SnapshotDefragService + triggerSnapshotDefragService(); + printSnapshotDirectoryListing("After SnapshotDefragService execution"); + + // Verify that the snapshots still exist + SnapshotInfo snap1 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-1")); + SnapshotInfo snap2 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-2")); + SnapshotInfo snap3 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-3")); + + assertNotNull(snap1, "Snapshot snap-1 should exist"); + assertNotNull(snap2, "Snapshot snap-2 should exist"); + assertNotNull(snap3, "Snapshot snap-3 should exist"); + + LOG.info("Test completed successfully"); + } +} diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index ee9535ac393a..8afe55105d3a 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -156,6 +156,8 @@ enum Type { PutObjectTagging = 140; GetObjectTagging = 141; DeleteObjectTagging = 142; + + SnapshotDefrag = 144; } enum SafeMode { @@ -304,6 +306,8 @@ message OMRequest { optional PutObjectTaggingRequest putObjectTaggingRequest = 141; optional DeleteObjectTaggingRequest deleteObjectTaggingRequest = 142; repeated SetSnapshotPropertyRequest SetSnapshotPropertyRequests = 143; + + optional SnapshotDefragRequest SnapshotDefragRequest = 144; } message OMResponse { @@ -437,6 +441,8 @@ message OMResponse { optional GetObjectTaggingResponse getObjectTaggingResponse = 140; optional PutObjectTaggingResponse putObjectTaggingResponse = 141; optional DeleteObjectTaggingResponse deleteObjectTaggingResponse = 142; + + optional SnapshotDefragResponse SnapshotDefragResponse = 144; } enum Status { @@ -1577,6 +1583,14 @@ message RangerBGSyncResponse { optional bool runSuccess = 1; } +message SnapshotDefragRequest { + optional bool noWait = 1; +} + +message SnapshotDefragResponse { + optional bool runSuccess = 1; +} + message FinalizeUpgradeRequest { required string upgradeClientId = 1; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index d69839a1c8db..fb55f8e8473a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, */ SstFilteringService getSnapshotSstFilteringService(); + /** + * Returns the instance of Snapshot SST Filtering service. + * @return Background service. + */ + SnapshotDefragService getSnapshotDefragService(); + /** * Returns the instance of Snapshot Deleting service. * @return Background service. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 10c29b960ea3..7c04242fcd1f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT; @@ -58,6 +60,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; @@ -200,6 +204,7 @@ public class KeyManagerImpl implements KeyManager { private KeyDeletingService keyDeletingService; private SstFilteringService snapshotSstFilteringService; + private SnapshotDefragService snapshotDefragService; private SnapshotDeletingService snapshotDeletingService; private final KeyProviderCryptoExtension kmsProvider; @@ -308,6 +313,11 @@ public void start(OzoneConfiguration configuration) { startSnapshotSstFilteringService(configuration); } + if (snapshotDefragService == null && + ozoneManager.isFilesystemSnapshotEnabled()) { + startSnapshotDefragService(configuration); + } + if (snapshotDeletingService == null && ozoneManager.isFilesystemSnapshotEnabled()) { @@ -391,6 +401,42 @@ public void stopSnapshotSstFilteringService() { } } + /** + * Start the snapshot defrag service if interval is not set to disabled value. + * @param conf + */ + public void startSnapshotDefragService(OzoneConfiguration conf) { + if (isDefragSvcEnabled()) { + long serviceInterval = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + snapshotDefragService = + new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS, + serviceTimeout, ozoneManager, conf); + snapshotDefragService.start(); + } else { + LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation will not run periodically."); + } + } + + /** + * Stop the snapshot defrag service if it is running. + */ + public void stopSnapshotDefragService() { + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } else { + LOG.info("SnapshotDefragService is already stopped or not started."); + } + } + private void startCompactionService(OzoneConfiguration configuration, boolean isCompactionServiceEnabled) { if (compactionService == null && isCompactionServiceEnabled) { @@ -417,7 +463,7 @@ KeyProviderCryptoExtension getKMSProvider() { } @Override - public void stop() throws IOException { + public void stop() { if (keyDeletingService != null) { keyDeletingService.shutdown(); keyDeletingService = null; @@ -434,6 +480,10 @@ public void stop() throws IOException { snapshotSstFilteringService.shutdown(); snapshotSstFilteringService = null; } + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } if (snapshotDeletingService != null) { snapshotDeletingService.shutdown(); snapshotDeletingService = null; @@ -448,6 +498,15 @@ public void stop() throws IOException { } } + /** + * Get the SnapshotDefragService instance. + * + * @return SnapshotDefragService instance, or null if not initialized + */ + public SnapshotDefragService getSnapshotDefragService() { + return snapshotDefragService; + } + private OmBucketInfo getBucketInfo(String volumeName, String bucketName) throws IOException { String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); @@ -992,7 +1051,16 @@ public boolean isSstFilteringSvcEnabled() { // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay return serviceInterval > 0; } - + + public boolean isDefragSvcEnabled() { + long serviceInterval = ozoneManager.getConfiguration() + .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay + return serviceInterval > 0; + } + @Override public OmMultipartUploadList listMultipartUploads(String volumeName, String bucketName, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java index 7b3c7e9de80e..875167e1130c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java @@ -35,8 +35,8 @@ */ public abstract class OmSnapshotLocalData { - // Version of the snapshot local data. 0 indicates uncompacted snapshot. - // compacted snapshots will have version > 0. + // Version of the snapshot local data. 0 indicates not defragged snapshot. + // defragged snapshots will have version > 0. private int version; // Checksum of the YAML representation @@ -45,18 +45,18 @@ public abstract class OmSnapshotLocalData { // Whether SST is filtered private boolean isSSTFiltered; - // Map of Table to uncompacted SST file list on snapshot create - private Map> uncompactedSSTFileList; + // Map of Table to not defragged SST file list on snapshot create + private Map> notDefraggedSSTFileList; - // Time of last compaction, in epoch milliseconds - private long lastCompactionTime; + // Time of last defragmentation, in epoch milliseconds + private long lastDefragTime; - // Whether the snapshot needs compaction - private boolean needsCompaction; + // Whether the snapshot needs defragmentation + private boolean needsDefragmentation; - // Map of version to compacted SST file list + // Map of version to defragged SST file list // Map> - private Map>> compactedSSTFileList; + private Map>> defraggedSSTFileList; public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8; private static final String DUMMY_CHECKSUM = new String(new byte[64], CHARSET_ENCODING); @@ -64,13 +64,13 @@ public abstract class OmSnapshotLocalData { /** * Creates a OmSnapshotLocalData object with default values. */ - public OmSnapshotLocalData(Map> uncompactedSSTFileList) { + public OmSnapshotLocalData(Map> notDefraggedSSTFileList) { this.isSSTFiltered = false; - this.uncompactedSSTFileList = uncompactedSSTFileList != null ? uncompactedSSTFileList : new HashMap<>(); - this.lastCompactionTime = 0L; - this.needsCompaction = false; - this.compactedSSTFileList = new HashMap<>(); - this.version = 0; + this.notDefraggedSSTFileList = notDefraggedSSTFileList != null ? notDefraggedSSTFileList : new HashMap<>(); + this.lastDefragTime = -1L; + this.needsDefragmentation = true; + this.defraggedSSTFileList = new HashMap<>(); + this.version = 1; setChecksumTo0ByteArray(); } @@ -81,24 +81,24 @@ public OmSnapshotLocalData(Map> uncompactedSSTFileList) { public OmSnapshotLocalData(OmSnapshotLocalData source) { // Copy primitive fields directly this.isSSTFiltered = source.isSSTFiltered; - this.lastCompactionTime = source.lastCompactionTime; - this.needsCompaction = source.needsCompaction; + this.lastDefragTime = source.lastDefragTime; + this.needsDefragmentation = source.needsDefragmentation; this.checksum = source.checksum; this.version = source.version; - // Deep copy for uncompactedSSTFileList - this.uncompactedSSTFileList = new HashMap<>(); + // Deep copy for notDefraggedSSTFileList + this.notDefraggedSSTFileList = new HashMap<>(); for (Map.Entry> entry : - source.uncompactedSSTFileList.entrySet()) { - this.uncompactedSSTFileList.put( + source.notDefraggedSSTFileList.entrySet()) { + this.notDefraggedSSTFileList.put( entry.getKey(), new HashSet<>(entry.getValue())); } - // Deep copy for compactedSSTFileList - this.compactedSSTFileList = new HashMap<>(); + // Deep copy for defraggedSSTFileList + this.defraggedSSTFileList = new HashMap<>(); for (Map.Entry>> versionEntry : - source.compactedSSTFileList.entrySet()) { + source.defraggedSSTFileList.entrySet()) { Map> tableMap = new HashMap<>(); for (Map.Entry> tableEntry : @@ -108,7 +108,7 @@ public OmSnapshotLocalData(OmSnapshotLocalData source) { new HashSet<>(tableEntry.getValue())); } - this.compactedSSTFileList.put(versionEntry.getKey(), tableMap); + this.defraggedSSTFileList.put(versionEntry.getKey(), tableMap); } } @@ -129,91 +129,91 @@ public void setSstFiltered(boolean sstFiltered) { } /** - * Returns the uncompacted SST file list. - * @return Map of Table to uncompacted SST file list + * Returns the not defragged SST file list. + * @return Map of Table to not defragged SST file list */ - public Map> getUncompactedSSTFileList() { - return Collections.unmodifiableMap(this.uncompactedSSTFileList); + public Map> getNotDefraggedSSTFileList() { + return Collections.unmodifiableMap(this.notDefraggedSSTFileList); } /** - * Sets the uncompacted SST file list. - * @param uncompactedSSTFileList Map of Table to uncompacted SST file list + * Sets the not defragged SST file list. + * @param notDefraggedSSTFileList Map of Table to not defragged SST file list */ - public void setUncompactedSSTFileList( - Map> uncompactedSSTFileList) { - this.uncompactedSSTFileList.clear(); - this.uncompactedSSTFileList.putAll(uncompactedSSTFileList); + public void setNotDefraggedSSTFileList( + Map> notDefraggedSSTFileList) { + this.notDefraggedSSTFileList.clear(); + this.notDefraggedSSTFileList.putAll(notDefraggedSSTFileList); } /** - * Adds an entry to the uncompacted SST file list. + * Adds an entry to the not defragged SST file list. * @param table Table name * @param sstFiles SST file name */ - public void addUncompactedSSTFileList(String table, Set sstFiles) { - this.uncompactedSSTFileList.computeIfAbsent(table, k -> new HashSet<>()) + public void addNotDefraggedSSTFileList(String table, Set sstFiles) { + this.notDefraggedSSTFileList.computeIfAbsent(table, k -> new HashSet<>()) .addAll(sstFiles); } /** - * Returns the last compaction time, in epoch milliseconds. - * @return Timestamp of the last compaction + * Returns the last defragmentation time, in epoch milliseconds. + * @return Timestamp of the last defragmentation */ - public long getLastCompactionTime() { - return lastCompactionTime; + public long getLastDefragTime() { + return lastDefragTime; } /** - * Sets the last compaction time, in epoch milliseconds. - * @param lastCompactionTime Timestamp of the last compaction + * Sets the last defragmentation time, in epoch milliseconds. + * @param lastDefragTime Timestamp of the last defragmentation */ - public void setLastCompactionTime(Long lastCompactionTime) { - this.lastCompactionTime = lastCompactionTime; + public void setLastDefragTime(Long lastDefragTime) { + this.lastDefragTime = lastDefragTime; } /** - * Returns whether the snapshot needs compaction. - * @return true if the snapshot needs compaction, false otherwise + * Returns whether the snapshot needs defragmentation. + * @return true if the snapshot needs defragmentation, false otherwise */ - public boolean getNeedsCompaction() { - return needsCompaction; + public boolean getNeedsDefragmentation() { + return needsDefragmentation; } /** - * Sets whether the snapshot needs compaction. - * @param needsCompaction true if the snapshot needs compaction, false otherwise + * Sets whether the snapshot needs defragmentation. + * @param needsDefragmentation true if the snapshot needs defragmentation, false otherwise */ - public void setNeedsCompaction(boolean needsCompaction) { - this.needsCompaction = needsCompaction; + public void setNeedsDefragmentation(boolean needsDefragmentation) { + this.needsDefragmentation = needsDefragmentation; } /** - * Returns the compacted SST file list. - * @return Map of version to compacted SST file list + * Returns the defragged SST file list. + * @return Map of version to defragged SST file list */ - public Map>> getCompactedSSTFileList() { - return Collections.unmodifiableMap(this.compactedSSTFileList); + public Map>> getDefraggedSSTFileList() { + return Collections.unmodifiableMap(this.defraggedSSTFileList); } /** - * Sets the compacted SST file list. - * @param compactedSSTFileList Map of version to compacted SST file list + * Sets the defragged SST file list. + * @param defraggedSSTFileList Map of version to defragged SST file list */ - public void setCompactedSSTFileList( - Map>> compactedSSTFileList) { - this.compactedSSTFileList.clear(); - this.compactedSSTFileList.putAll(compactedSSTFileList); + public void setDefraggedSSTFileList( + Map>> defraggedSSTFileList) { + this.defraggedSSTFileList.clear(); + this.defraggedSSTFileList.putAll(defraggedSSTFileList); } /** - * Adds an entry to the compacted SST file list. + * Adds an entry to the defragged SST file list. * @param ver Version number (TODO: to be clarified) * @param table Table name * @param sstFiles SST file name */ - public void addCompactedSSTFileList(Integer ver, String table, Set sstFiles) { - this.compactedSSTFileList.computeIfAbsent(ver, k -> Maps.newHashMap()) + public void addDefraggedSSTFileList(Integer ver, String table, Set sstFiles) { + this.defraggedSSTFileList.computeIfAbsent(ver, k -> Maps.newHashMap()) .computeIfAbsent(table, k -> new HashSet<>()) .addAll(sstFiles); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java index 3ed915edfdef..bc53a1b1b0e7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java @@ -127,7 +127,7 @@ public Object construct(Node node) { Map nodes = constructMapping(mnode); Map> uncompactedSSTFileList = - (Map>) nodes.get(OzoneConsts.OM_SLD_UNCOMPACTED_SST_FILE_LIST); + (Map>) nodes.get(OzoneConsts.OM_SLD_NOT_DEFRAGGED_SST_FILE_LIST); OmSnapshotLocalDataYaml snapshotLocalData = new OmSnapshotLocalDataYaml(uncompactedSSTFileList); // Set version from YAML @@ -138,7 +138,7 @@ public Object construct(Node node) { snapshotLocalData.setSstFiltered((Boolean) nodes.getOrDefault(OzoneConsts.OM_SLD_IS_SST_FILTERED, false)); // Handle potential Integer/Long type mismatch from YAML parsing - Object lastCompactionTimeObj = nodes.getOrDefault(OzoneConsts.OM_SLD_LAST_COMPACTION_TIME, -1L); + Object lastCompactionTimeObj = nodes.getOrDefault(OzoneConsts.OM_SLD_LAST_DEFRAG_TIME, -1L); long lastCompactionTime; if (lastCompactionTimeObj instanceof Number) { lastCompactionTime = ((Number) lastCompactionTimeObj).longValue(); @@ -146,16 +146,19 @@ public Object construct(Node node) { throw new IllegalArgumentException("Invalid type for lastCompactionTime: " + lastCompactionTimeObj.getClass().getName() + ". Expected Number type."); } - snapshotLocalData.setLastCompactionTime(lastCompactionTime); + snapshotLocalData.setLastDefragTime(lastCompactionTime); - snapshotLocalData.setNeedsCompaction((Boolean) nodes.getOrDefault(OzoneConsts.OM_SLD_NEEDS_COMPACTION, false)); + snapshotLocalData.setNeedsDefragmentation( + (Boolean) nodes.getOrDefault(OzoneConsts.OM_SLD_NEEDS_DEFRAGMENTATION, false)); Map>> compactedSSTFileList = - (Map>>) nodes.get(OzoneConsts.OM_SLD_COMPACTED_SST_FILE_LIST); + (Map>>) nodes.get(OzoneConsts.OM_SLD_DEFRAGGED_SST_FILE_LIST); if (compactedSSTFileList != null) { - snapshotLocalData.setCompactedSSTFileList(compactedSSTFileList); + snapshotLocalData.setDefraggedSSTFileList(compactedSSTFileList); } + // TODO: Add OM_SLD_DB_CHECKPOINT_DIR field + String checksum = (String) nodes.get(OzoneConsts.OM_SLD_CHECKSUM); if (checksum != null) { snapshotLocalData.setChecksum(checksum); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 89c920f9a325..02e5536f4588 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -3549,6 +3549,53 @@ public boolean triggerRangerBGSync(boolean noWait) throws IOException { } } + @Override + public boolean triggerSnapshotDefrag(boolean noWait) throws IOException { + + // OM should be leader and ready. + if (!isLeaderReady()) { + throw new OMException("OM is not leader or not ready", INVALID_REQUEST); + } + + final UserGroupInformation ugi = getRemoteUser(); + // Check Ozone admin privilege + if (!isAdmin(ugi)) { + throw new OMException("Only Ozone admins are allowed to trigger " + + "snapshot defragmentation manually", PERMISSION_DENIED); + } + + // Get the SnapshotDefragService from KeyManager + final SnapshotDefragService defragService = keyManager.getSnapshotDefragService(); + if (defragService == null) { + throw new OMException("Snapshot defragmentation service is not initialized", + FEATURE_NOT_ENABLED); + } + + // Trigger Snapshot Defragmentation + if (noWait) { + final Thread t = new Thread(() -> { + try { + defragService.triggerSnapshotDefragOnce(); + } catch (Exception e) { + LOG.error("Error during snapshot defragmentation", e); + } + }, threadPrefix + "SnapshotDefrag"); + t.start(); + LOG.info("User '{}' manually triggered Snapshot Defragmentation without waiting" + + " in a new thread, tid = {}", ugi, t.getId()); + return true; + } else { + LOG.info("User '{}' manually triggered Snapshot Defragmentation and is waiting", ugi); + try { + defragService.triggerSnapshotDefragOnce(); + return true; + } catch (Exception e) { + LOG.error("Error during snapshot defragmentation", e); + return false; + } + } + } + @Override public StatusAndMessages finalizeUpgrade(String upgradeClientID) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java new file mode 100644 index 000000000000..5573bbab9eee --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java @@ -0,0 +1,1111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGED_DIR; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background service for defragmenting snapshots in the active snapshot chain. + * When snapshots are taken, they capture the entire OM RocksDB state but may contain + * fragmented data. This service defragments snapshots by creating new compacted + * RocksDB instances with only the necessary data for tracked column families. + *

+ * The service processes snapshots in the active chain sequentially, starting with + * the first non-defragmented snapshot. For the first snapshot in the chain, it + * performs a full defragmentation by copying all keys. For subsequent snapshots, + * it uses incremental defragmentation based on diffs from the previous defragmented + * snapshot. + */ +public class SnapshotDefragService extends BackgroundService + implements BootstrapStateHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SnapshotDefragService.class); + + // Use only a single thread for snapshot defragmentation to avoid conflicts + private static final int DEFRAG_CORE_POOL_SIZE = 1; + + private static final String CHECKPOINT_STATE_DEFRAGGED_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGED_DIR; + + private final OzoneManager ozoneManager; + private final AtomicLong runCount = new AtomicLong(0); + + // Number of snapshots to be processed in a single iteration + private final long snapshotLimitPerTask; + + private final AtomicLong snapshotsDefraggedCount; + private final AtomicBoolean running; + + private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + + public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, + OzoneManager ozoneManager, OzoneConfiguration configuration) { + super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + serviceTimeout, ozoneManager.getThreadNamePrefix()); + this.ozoneManager = ozoneManager; + this.snapshotLimitPerTask = configuration + .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, + SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); + snapshotsDefraggedCount = new AtomicLong(0); + running = new AtomicBoolean(false); + } + + @Override + public void start() { + running.set(true); + super.start(); + } + + @VisibleForTesting + public void pause() { + running.set(false); + } + + @VisibleForTesting + public void resume() { + running.set(true); + } + + /** + * Checks if rocks-tools native library is available. + */ + private boolean isRocksToolsNativeLibAvailable() { + try { + return ManagedRawSSTFileReader.tryLoadLibrary(); + } catch (Exception e) { + LOG.warn("Failed to check native code availability", e); + return false; + } + } + + /** + * Checks if a snapshot needs defragmentation by examining its YAML metadata. + */ + private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + try { + // Read YAML metadata using the correct API + File yamlFile = new File(snapshotPath + ".yaml"); + OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile); + + // Check if snapshot needs compaction (defragmentation) + boolean needsDefrag = yamlData.getNeedsDefragmentation(); + LOG.debug("Snapshot {} needsDefragmentation field value: {}", + snapshotInfo.getName(), needsDefrag); + + return needsDefrag; + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", + snapshotInfo.getName(), e); + return true; + } + } + + /** + * Finds the first active snapshot in the chain that needs defragmentation. + */ + private SnapshotInfo findFirstSnapshotNeedingDefrag( + Table snapshotInfoTable) throws IOException { + + LOG.debug("Searching for first snapshot needing defragmentation in active chain"); + + try (TableIterator> iterator = + snapshotInfoTable.iterator()) { + iterator.seekToFirst(); + + while (iterator.hasNext()) { + Table.KeyValue keyValue = iterator.next(); + SnapshotInfo snapshotInfo = keyValue.getValue(); + + // Skip deleted snapshots + if (snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { + LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName()); + continue; + } + + // Check if this snapshot needs defragmentation + if (needsDefragmentation(snapshotInfo)) { + LOG.info("Found snapshot needing defragmentation: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return snapshotInfo; + } + + LOG.debug("Snapshot {} already defragmented, continuing search", + snapshotInfo.getName()); + } + } + + LOG.debug("No snapshots found needing defragmentation"); + return null; + } + + /** + * Finds the previous defragmented snapshot in the chain. + */ + private SnapshotInfo findPreviousDefraggedSnapshot(SnapshotInfo currentSnapshot, + Table snapshotInfoTable) throws IOException { + + LOG.debug("Searching for previous defragmented snapshot before: {}", + currentSnapshot.getName()); + + // Walk backwards through the snapshot chain using pathPreviousSnapshotId + String previousSnapshotId = currentSnapshot.getPathPreviousSnapshotId() != null ? + currentSnapshot.getPathPreviousSnapshotId().toString() : null; + + while (previousSnapshotId != null) { + try (TableIterator> iterator = + snapshotInfoTable.iterator()) { + iterator.seekToFirst(); + + while (iterator.hasNext()) { + Table.KeyValue keyValue = iterator.next(); + SnapshotInfo snapshotInfo = keyValue.getValue(); + + if (snapshotInfo.getSnapshotId().toString().equals(previousSnapshotId)) { + if (!needsDefragmentation(snapshotInfo)) { + LOG.info("Found previous defragmented snapshot: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return snapshotInfo; + } + + // Continue searching with this snapshot's previous + previousSnapshotId = snapshotInfo.getPathPreviousSnapshotId() != null ? + snapshotInfo.getPathPreviousSnapshotId().toString() : null; + break; + } + } + } + } + + LOG.debug("No previous defragmented snapshot found"); + return null; + } + + /** + * Performs full defragmentation for the first snapshot in the chain. + * This is a simplified implementation that demonstrates the concept. + */ + private void performFullDefragmentation(SnapshotInfo snapshotInfo, + OmSnapshot omSnapshot) throws IOException { + + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + // For defraggedDbPath, we need to go up to the parent directory and use checkpointStateDefragged + String parentDir = Paths.get(snapshotPath).getParent().getParent().toString(); + String checkpointDirName = Paths.get(snapshotPath).getFileName().toString(); + String defraggedDbPath = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, checkpointDirName).toString(); + + LOG.info("Starting full defragmentation for snapshot: {} at path: {}", + snapshotInfo.getName(), snapshotPath); + LOG.info("Target defragmented DB path: {}", defraggedDbPath); + + // Create defragmented directory + Files.createDirectories(Paths.get(defraggedDbPath)); + + // TODO: Get snapshot checkpoint DB via SnapshotCache + RDBStore originalStore = (RDBStore) omSnapshot.getMetadataManager().getStore(); + RocksDatabase originalDb = originalStore.getDb(); + + LOG.info("Starting defragmentation process for snapshot: {}", snapshotInfo.getName()); + LOG.info("Original DB path: {}", snapshotPath); + LOG.info("Defragmented DB path: {}", defraggedDbPath); + + // Implement actual RocksDB defragmentation + try { + // 1. Create a new RocksDB instance at defraggedDbPath + DBStoreBuilder defraggedDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(checkpointDirName) + .setPath(Paths.get(defraggedDbPath).getParent()) + .setCreateCheckpointDirs(false); + + // Add all the tracked column families to the new DB + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + defraggedDbBuilder.addTable(cfName); + LOG.debug("Added column family {} to defragmented DB", cfName); + } + + // TODO: Add DELETED_TABLE, DELETED_FILE_TABLE and DELETED_DIR_TABLE + + // Build the new defragmented database + DBStore defraggedStore = defraggedDbBuilder.build(); + RocksDatabase defraggedDb = ((RDBStore) defraggedStore).getDb(); + + LOG.info("Created new defragmented DB instance for snapshot: {}", snapshotInfo.getName()); + + // 2. & 3. Iterate through tracked column families and copy all key-value pairs + long totalKeysCopied = 0; + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + LOG.info("Starting defragmentation of column family: {} for snapshot: {}", + cfName, snapshotInfo.getName()); + + RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName); + + if (originalCf == null) { + LOG.warn("Column family {} not found in original DB, skipping", cfName); + continue; + } + + if (defraggedCf == null) { + LOG.warn("Column family {} not found in defragmented DB, skipping", cfName); + continue; + } + + long cfKeysCopied = 0; + try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(); + ManagedRocksIterator iterator = originalDb.newIterator(originalCf)) { + + iterator.get().seekToFirst(); + + while (iterator.get().isValid()) { + byte[] key = iterator.get().key(); + byte[] value = iterator.get().value(); + + // Add to batch for efficient writing + defraggedCf.batchPut(writeBatch, key, value); + cfKeysCopied++; + + // Commit batch every 1000 keys to avoid memory issues + if (cfKeysCopied % 1000 == 0) { + defraggedDb.batchWrite(writeBatch); + writeBatch.clear(); + LOG.debug("Copied {} keys for column family {} so far", cfKeysCopied, cfName); + } + + iterator.get().next(); + } + + // Commit any remaining keys in the batch + if (writeBatch.count() > 0) { + defraggedDb.batchWrite(writeBatch); + } + + totalKeysCopied += cfKeysCopied; + LOG.info("Completed copying {} keys for column family: {} in snapshot: {}", + cfKeysCopied, cfName, snapshotInfo.getName()); + } + } + + LOG.info("Copied total of {} keys across all column families for snapshot: {}", + totalKeysCopied, snapshotInfo.getName()); + + // 4. Perform compaction on the new DB to ensure it's fully defragmented + LOG.info("Starting compaction of defragmented DB for snapshot: {}", snapshotInfo.getName()); + try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) { + compactOptions.setChangeLevel(true); + compactOptions.setTargetLevel(1); + defraggedDb.compactDB(compactOptions); + } + LOG.info("Completed compaction of defragmented DB for snapshot: {}", snapshotInfo.getName()); + + // 5. Verify data integrity between original and defragmented DBs + verifyDbIntegrity(originalDb, defraggedDb, snapshotInfo); + + // Close the defragmented DB + defraggedStore.close(); + + // TODO: Rename om.db to the om.db-UUID (at least for full defrag) + + LOG.info("Successfully completed full defragmentation for snapshot: {} with {} keys copied", + snapshotInfo.getName(), totalKeysCopied); + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } catch (Exception e) { + LOG.error("Unexpected error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } + } + + /** + * Performs incremental defragmentation using diff from previous defragmented snapshot. + */ + private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, + SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) { + + String currentSnapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), currentSnapshot); + String previousSnapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), previousDefraggedSnapshot); + + // Fix path construction similar to performFullDefragmentation + String previousParentDir = Paths.get(previousSnapshotPath).getParent().getParent().toString(); + String previousCheckpointDirName = Paths.get(previousSnapshotPath).getFileName().toString(); + String previousDefraggedDbPath = Paths.get(previousParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + previousCheckpointDirName).toString(); + + String currentParentDir = Paths.get(currentSnapshotPath).getParent().getParent().toString(); + String currentCheckpointDirName = Paths.get(currentSnapshotPath).getFileName().toString(); + String currentDefraggedDbPath = Paths.get(currentParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + currentCheckpointDirName).toString(); + + LOG.info("Starting incremental defragmentation for snapshot: {} using previous: {}", + currentSnapshot.getName(), previousDefraggedSnapshot.getName()); + LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath); + LOG.info("Current target DB: {}", currentDefraggedDbPath); + + // Note: Don't create target directory - RocksDB createCheckpoint() will create it + // and will fail with "Directory exists" if we create it first + + try { + // Check if previous defragmented DB exists + if (!Files.exists(Paths.get(previousDefraggedDbPath))) { + LOG.warn("Previous defragmented DB not found at {}, falling back to full defragmentation", + previousDefraggedDbPath); + // TODO: Throw exception instead of falling back + performFullDefragmentation(currentSnapshot, currentOmSnapshot); + return; + } + + // 1. Create a checkpoint from the previous defragmented DB directly at target location + LOG.info("Creating checkpoint from previous defragmented DB directly to target location"); + + // Open the previous defragmented DB to create checkpoint. + // TODO: via SnapshotCache or something equivalent for lock protection + DBStoreBuilder previousDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(previousCheckpointDirName) + .setPath(Paths.get(previousDefraggedDbPath).getParent()) + .setOpenReadOnly(true) + .setCreateCheckpointDirs(false) + .disableDefaultCFAutoCompaction(true); + + // Add tracked column families + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + previousDbBuilder.addTable(cfName); + } + + try (DBStore previousDefraggedStore = previousDbBuilder.build()) { + RocksDatabase previousDefraggedDb = ((RDBStore) previousDefraggedStore).getDb(); + + // Create checkpoint directly at the target location + try (RocksDatabase.RocksCheckpoint checkpoint = previousDefraggedDb.createCheckpoint()) { + checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath)); + LOG.info("Created checkpoint directly at target: {}", currentDefraggedDbPath); + } + } + + // 2. Open the checkpoint as our working defragmented DB and apply incremental changes + DBStoreBuilder currentDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(currentCheckpointDirName) + .setPath(Paths.get(currentDefraggedDbPath).getParent()) + .setCreateCheckpointDirs(false) + // Disable compaction for defragmented snapshot DB + .disableDefaultCFAutoCompaction(true); + + // Add tracked column families + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + currentDbBuilder.addTable(cfName); + } + + // Build DB from the checkpoint + DBStore currentDefraggedStore = currentDbBuilder.build(); + RocksDatabase currentDefraggedDb = ((RDBStore) currentDefraggedStore).getDb(); + + LOG.info("Opened checkpoint as working defragmented DB for incremental update (compaction disabled)"); + + // 3. Apply incremental changes from current snapshot + RDBStore currentSnapshotStore = (RDBStore) currentOmSnapshot.getMetadataManager().getStore(); + RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb(); + + long incrementalKeysCopied = applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore, + currentSnapshot, previousDefraggedSnapshot); + + LOG.info("Applied {} incremental changes for snapshot: {}", + incrementalKeysCopied, currentSnapshot.getName()); + + // 4. Flush each table to exactly one SST file + LOG.info("Flushing defragged DB for snapshot: {}", currentSnapshot.getName()); + currentDefraggedStore.flushDB(); +// for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { +// try { +// Table table = currentDefraggedStore.getTable(cfName); +// if (table != null) { +// ((RDBStore) currentDefraggedStore).flushDB(cfName); +// LOG.debug("Flushed table {} to SST file", cfName); +// } +// } catch (Exception e) { +// LOG.warn("Failed to flush table {}: {}", cfName, e.getMessage(), e); +// } +// } + + // 4. Perform compaction on the updated DB (commented out) +// LOG.info("Starting compaction of incrementally defragmented DB for snapshot: {}", +// currentSnapshot.getName()); +// try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) { +// compactOptions.setChangeLevel(true); +// compactOptions.setTargetLevel(1); +// currentDefraggedDb.compactDB(compactOptions); +// } +// LOG.info("Completed compaction of incrementally defragmented DB"); + + // 5. Verify data integrity + verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb, currentSnapshot); + + // Close the defragmented DB. TODO: Close in finally block instead + currentDefraggedStore.close(); + + LOG.info("Successfully completed incremental defragmentation for snapshot: {} with {} incremental changes", + currentSnapshot.getName(), incrementalKeysCopied); + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during incremental defragmentation of snapshot: {}", + currentSnapshot.getName(), e); +// LOG.warn("Falling back to full defragmentation due to error"); +// performFullDefragmentation(currentSnapshot, currentOmSnapshot); + } catch (Exception e) { + LOG.error("Unexpected error during incremental defragmentation of snapshot: {}", + currentSnapshot.getName(), e); +// LOG.warn("Falling back to full defragmentation due to error"); +// performFullDefragmentation(currentSnapshot, currentOmSnapshot); + } + } + + /** + * Applies incremental changes by using snapshotDiff to compute the diff list, + * then iterating that diff list against the current snapshot checkpoint DB. + * Uses direct batch writing to the defragmented DB instead of SST files to avoid sorting requirements. + */ + @SuppressWarnings("checkstyle:MethodLength") + private long applyIncrementalChanges(RocksDatabase currentSnapshotDb, DBStore targetStore, + SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws RocksDatabaseException { + + LOG.info("Applying incremental changes for snapshot: {} since: {} using direct batch writing", + currentSnapshot.getName(), previousSnapshot.getName()); + + long totalChanges = 0; + + try { + // Use snapshotDiff to compute the diff list between previous and current snapshot + LOG.info("Computing snapshot diff between {} and {}", + previousSnapshot.getName(), currentSnapshot.getName()); + + SnapshotDiffResponse diffResponse; + try { + // Call snapshotDiff to get the diff list + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + null, // token - start from beginning + 0, // pageSize - get all diffs at once + false, // forceFullDiff + false // disableNativeDiff + ); + + // Wait for snapshotDiff computation to complete if it's still in progress + while (diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.IN_PROGRESS || + diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.QUEUED) { + LOG.info("Snapshot diff computation in progress, waiting {} ms...", + diffResponse.getWaitTimeInMs()); + // TODO: This can be improved by triggering snapdiff first, before any locks are grabbed, + // so that we don't have to wait here + Thread.sleep(diffResponse.getWaitTimeInMs()); + + // Poll for updated status + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + null, // token + 0, // pageSize + false, // forceFullDiff + false // disableNativeDiff + ); + } + + if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) { + throw new RocksDatabaseException("Snapshot diff computation failed with status: " + + diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason()); + } + + LOG.info("Snapshot diff computation completed successfully"); + + } catch (Exception e) { + throw new RocksDatabaseException("Failed to compute snapshot diff", e); + } + + SnapshotDiffReportOzone diffReport = diffResponse.getSnapshotDiffReport(); + if (diffReport == null || diffReport.getDiffList() == null) { + LOG.info("No differences found between snapshots, no changes to apply"); + return 0; + } + + LOG.info("Found {} differences to process from first page of snapshotDiff list", diffReport.getDiffList().size()); + + // Get table references for target database + RDBStore targetRdbStore = (RDBStore) targetStore; + RocksDatabase targetDb = targetRdbStore.getDb(); + + int nextToken = 0; + while (diffReport.getDiffList() != null && !diffReport.getDiffList().isEmpty()) { + final List diffList = diffReport.getDiffList(); + + // Group diff entries by column family and process each CF separately + // TODO: Use bucket layout to determine which column families to process + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + RocksDatabase.ColumnFamily currentCf = currentSnapshotDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily targetCf = targetDb.getColumnFamily(cfName); + + if (currentCf == null || targetCf == null) { + LOG.warn("Column family {} not found, skipping incremental changes", cfName); + continue; + } + + long cfChanges = 0; + + LOG.debug("Processing {} diff entries for column family {}", diffList.size(), cfName); + + // Use batch writing for efficient operations (no sorting required) + try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) { + + // Iterate through the diff list and process each entry + for (DiffReportEntry diffEntry : diffList) { + String sourcePath = new String(diffEntry.getSourcePath(), StandardCharsets.UTF_8); + + // Extract the key from the path using volume and bucket from snapshot context + byte[] key = extractKeyFromPath(sourcePath, cfName, + currentSnapshot.getVolumeName(), currentSnapshot.getBucketName()); + if (key == null) { + continue; // Skip if this entry doesn't belong to current column family + } + + DiffType diffType = diffEntry.getType(); + + switch (diffType) { + case CREATE: + case MODIFY: + // Key was created or modified - get current value and add to batch + byte[] currentValue = currentSnapshotDb.get(currentCf, key); + if (currentValue != null) { + targetCf.batchPut(writeBatch, key, currentValue); + cfChanges++; + } + break; + + case DELETE: + // Key was deleted - add deletion to batch + targetCf.batchDelete(writeBatch, key); + cfChanges++; + break; + + case RENAME: + // Handle rename - delete old key and create new key + if (diffEntry.getTargetPath() != null) { + String targetPath = new String(diffEntry.getTargetPath(), StandardCharsets.UTF_8); + byte[] newKey = extractKeyFromPath(targetPath, cfName, + currentSnapshot.getVolumeName(), currentSnapshot.getBucketName()); + + if (newKey != null) { + // Delete old key + targetCf.batchDelete(writeBatch, key); + + // Add new key with current value + byte[] newValue = currentSnapshotDb.get(currentCf, newKey); + if (newValue != null) { + targetCf.batchPut(writeBatch, newKey, newValue); + } + cfChanges += 2; // Count both delete and put + } + } + break; + + default: + LOG.warn("Unknown diff type: {}, skipping entry", diffType); + break; + } + + // Commit batch every 1000 operations to avoid memory issues + if (cfChanges % 1000 == 0 && writeBatch.count() > 0) { + targetDb.batchWrite(writeBatch); + writeBatch.clear(); + LOG.debug("Committed batch for column family {} after {} changes", cfName, cfChanges); + } + } + + // Commit any remaining operations in the batch + if (writeBatch.count() > 0) { + targetDb.batchWrite(writeBatch); + LOG.debug("Final batch commit for column family {} with {} total changes", cfName, cfChanges); + } + + } catch (Exception e) { + LOG.error("Error processing column family {} for snapshot {}: {}", + cfName, currentSnapshot.getName(), e.getMessage(), e); + throw new RocksDatabaseException("Failed to apply incremental changes for column family: " + cfName, e); + } + + totalChanges += cfChanges; + LOG.info("Applied {} incremental changes for column family: {}", cfChanges, cfName); + } + + // Get next page of differences if available + nextToken += diffList.size(); + LOG.debug("Retrieving next page of snapshot diff with token: {}", nextToken); + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + String.valueOf(nextToken), // token + 0, // pageSize + false, // forceFullDiff + false // disableNativeDiff + ); + + if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) { + throw new RocksDatabaseException("Expecting DONE but got unexpected snapshot diff status: " + + diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason()); + } + + LOG.info("Retrieved next page of snapshot diff, size: {}", + diffResponse.getSnapshotDiffReport().getDiffList().size()); + diffReport = diffResponse.getSnapshotDiffReport(); + } + + } catch (Exception e) { + throw new RocksDatabaseException("Failed to apply incremental changes", e); + } + + LOG.info("Applied {} total incremental changes using direct batch writing", totalChanges); + return totalChanges; + } + + /** + * Extracts the database key from a diff report path for a specific column family. + * This method converts paths from snapshot diff reports into database keys. + */ + private byte[] extractKeyFromPath(String path, String columnFamily, String volume, String bucket) { + try { + if (KEY_TABLE.equals(columnFamily)) { + // For keyTable, use OmMetadataManagerImpl#getOzoneKey + // Path in diff report contains just the key part (after volume/bucket) + String dbKey = ozoneManager.getMetadataManager().getOzoneKey(volume, bucket, path); + return dbKey.getBytes(StandardCharsets.UTF_8); + } else if (FILE_TABLE.equals(columnFamily)) { // TODO: FSO code path not tested + // For fileTable, use OmMetadataManagerImpl#getOzoneKeyFSO + // Path in diff report contains just the key part (after volume/bucket) + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volume) + .setBucketName(bucket) + .setKeyName(path) + .build(); + OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); + if (keyInfo == null) { + // TODO: Handle FSO case properly. key might not exist in current DB + // Should use source/target DB depending on diff type + return null; + } + String dbKey = ozoneManager.getMetadataManager().getOzoneKeyFSO(volume, bucket, + keyInfo.getParentObjectID() + "/" + path); + return dbKey.getBytes(StandardCharsets.UTF_8); + } + + // If we can't extract a valid key for this column family, return null + // This will cause the entry to be skipped for this column family + return null; + + } catch (Exception e) { + LOG.warn("Failed to extract key from path: {} for column family: {}, volume: {}, bucket: {}, error: {}", + path, columnFamily, volume, bucket, e.getMessage(), e); + return null; + } + } + + /** + * Verifies DB integrity by comparing key counts and spot-checking keys/values + * between the original and defragmented databases. + */ + private void verifyDbIntegrity(RocksDatabase originalDb, RocksDatabase defraggedDb, + SnapshotInfo snapshotInfo) { + + LOG.info("Starting DB integrity verification for snapshot: {}", snapshotInfo.getName()); + + boolean verificationPassed = true; + long totalOriginalKeys = 0; + long totalDefraggedKeys = 0; + + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + LOG.debug("Verifying column family: {} for snapshot: {}", + cfName, snapshotInfo.getName()); + + RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName); + + if (originalCf == null) { + LOG.warn("Column family {} not found in original DB, skipping verification", cfName); + continue; + } + + if (defraggedCf == null) { + LOG.error("Column family {} not found in defragmented DB, verification failed", cfName); + verificationPassed = false; + continue; + } + + try { + // Count keys in original DB + long originalKeyCount = 0; + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + while (originalIterator.get().isValid()) { + originalKeyCount++; + originalIterator.get().next(); + } + } + + // Count keys in defragmented DB + long defraggedKeyCount = 0; + try (ManagedRocksIterator defraggedIterator = defraggedDb.newIterator(defraggedCf)) { + defraggedIterator.get().seekToFirst(); + while (defraggedIterator.get().isValid()) { + defraggedKeyCount++; + defraggedIterator.get().next(); + } + } + + totalOriginalKeys += originalKeyCount; + totalDefraggedKeys += defraggedKeyCount; + + // Verify key counts match + if (originalKeyCount != defraggedKeyCount) { + LOG.error("Key count mismatch for column family {}: original={}, defragmented={}", + cfName, originalKeyCount, defraggedKeyCount); + verificationPassed = false; + } else { + LOG.info("Key count verification passed for column family {}: {} keys", + cfName, originalKeyCount); + } + + // Full verification - check every single key-value pair + long fullCheckCount = 0; + long fullCheckErrors = 0; + + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + + while (originalIterator.get().isValid()) { + byte[] originalKey = originalIterator.get().key(); + byte[] originalValue = originalIterator.get().value(); + + // Get the same key from defragmented DB + byte[] defraggedValue = defraggedDb.get(defraggedCf, originalKey); + + if (defraggedValue == null) { + LOG.error("Key missing in defragmented DB for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } else if (!java.util.Arrays.equals(originalValue, defraggedValue)) { + LOG.error("Value mismatch for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } + + fullCheckCount++; + + // Log progress every 10,000 keys to avoid log spam + if (fullCheckCount % 10000 == 0) { + LOG.debug("Full verification progress for column family {}: checked {} keys, {} errors so far", + cfName, fullCheckCount, fullCheckErrors); + } + + if (fullCheckErrors > 10) { + LOG.warn("Too many errors found during full verification for column family {}, stopping further checks", + cfName); + break; // Stop if too many errors to avoid flooding logs + } + + originalIterator.get().next(); + } + } + + if (fullCheckErrors == 0) { + LOG.info("Full verification PASSED for column family {}: all {} keys verified successfully", + cfName, fullCheckCount); + } else { + LOG.error("Full verification FAILED for column family {}: {} errors found out of {} keys checked", + cfName, fullCheckErrors, fullCheckCount); + } + + } catch (Exception e) { + LOG.error("Error during verification of column family {} for snapshot {}: {}", + cfName, snapshotInfo.getName(), e.getMessage(), e); + verificationPassed = false; + } + } + + // Log final verification results + if (verificationPassed) { + LOG.info("DB integrity verification PASSED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + } else { + LOG.error("DB integrity verification FAILED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + // Consider throwing an exception here if verification failure should halt the process + // throw new IOException("DB integrity verification failed for snapshot: " + snapshotInfo.getName()); + } + } + + /** + * Updates snapshot metadata to point to the new defragmented DB location. + */ + private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws IOException { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + LOG.info("Updating snapshot metadata for: {} at path: {}", + snapshotInfo.getName(), snapshotPath); + + try { + // Read current YAML data using the correct API + File yamlFile = new File(snapshotPath + ".yaml"); + OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile); + + // Mark as defragmented by setting needsCompaction to false + yamlData.setNeedsDefragmentation(false); + + // Write updated YAML data + yamlData.writeToYaml(yamlFile); + + LOG.info("Successfully updated metadata for snapshot: {}, " + + "marked as defragmented (needsCompaction=false)", + snapshotInfo.getName()); + + } catch (IOException e) { + LOG.error("Failed to update metadata for snapshot: {}", snapshotInfo.getName(), e); + throw e; + } + } + + private final class SnapshotDefragTask implements BackgroundTask { + + @Override + public BackgroundTaskResult call() throws Exception { + // Check OM leader and readiness + if (shouldRun()) { + final long count = runCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", count); + } + triggerSnapshotDefragOnce(); + } + + return EmptyTaskResult.newResult(); + } + } + + public synchronized boolean triggerSnapshotDefragOnce() throws IOException { + // Check if rocks-tools native lib is available + if (!isRocksToolsNativeLibAvailable()) { + LOG.warn("Rocks-tools native library is not available. " + + "Stopping SnapshotDefragService."); + return false; + } + + Optional snapshotManager = Optional.ofNullable(ozoneManager) + .map(OzoneManager::getOmSnapshotManager); + if (!snapshotManager.isPresent()) { + LOG.debug("OmSnapshotManager not available, skipping defragmentation task"); + return false; + } + + Table snapshotInfoTable = + ozoneManager.getMetadataManager().getSnapshotInfoTable(); + + long snapshotLimit = snapshotLimitPerTask; + + while (snapshotLimit > 0 && running.get()) { + // Find the first snapshot needing defragmentation + SnapshotInfo snapshotToDefrag = findFirstSnapshotNeedingDefrag(snapshotInfoTable); + + if (snapshotToDefrag == null) { + LOG.info("No snapshots found needing defragmentation"); + break; + } + + // Acquire SNAPSHOT_GC_LOCK + OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock() + .acquireWriteLock(SNAPSHOT_GC_LOCK, "defrag-" + snapshotToDefrag.getSnapshotId()); + + if (!gcLockDetails.isLockAcquired()) { + LOG.warn("Failed to acquire SNAPSHOT_GC_LOCK for snapshot: {}", + snapshotToDefrag.getName()); + break; + } + + try { + LOG.info("Processing snapshot defragmentation for: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Get snapshot through SnapshotCache for proper locking + try (UncheckedAutoCloseableSupplier snapshotSupplier = + snapshotManager.get().getActiveSnapshot( + snapshotToDefrag.getVolumeName(), + snapshotToDefrag.getBucketName(), + snapshotToDefrag.getName())) { + + OmSnapshot omSnapshot = snapshotSupplier.get(); + + // Check if this is the first snapshot in the chain + SnapshotInfo previousDefraggedSnapshot = findPreviousDefraggedSnapshot( + snapshotToDefrag, snapshotInfoTable); + + if (previousDefraggedSnapshot == null) { + LOG.info("Performing full defragmentation for first snapshot: {}", + snapshotToDefrag.getName()); + performFullDefragmentation(snapshotToDefrag, omSnapshot); + } else { + LOG.info("Performing incremental defragmentation for snapshot: {} " + + "based on previous defragmented snapshot: {}", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + performIncrementalDefragmentation(snapshotToDefrag, + previousDefraggedSnapshot, omSnapshot); + } + + // Update snapshot metadata + updateSnapshotMetadata(snapshotToDefrag); + + // Close and evict the original snapshot DB from SnapshotCache + // TODO: Implement proper eviction from SnapshotCache + LOG.info("Defragmentation completed for snapshot: {}", + snapshotToDefrag.getName()); + + snapshotLimit--; + snapshotsDefraggedCount.getAndIncrement(); + + } catch (OMException ome) { + if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { + LOG.info("Snapshot {} was deleted during defragmentation", + snapshotToDefrag.getName()); + } else { + LOG.error("OMException during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), ome); + } + } + + } catch (Exception e) { + LOG.error("Exception during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), e); + return false; + } finally { + // Release SNAPSHOT_GC_LOCK + ozoneManager.getMetadataManager().getLock() + .releaseWriteLock(SNAPSHOT_GC_LOCK, "defrag-" + snapshotToDefrag.getSnapshotId()); + LOG.debug("Released SNAPSHOT_GC_LOCK for snapshot: {}", snapshotToDefrag.getName()); + } + } + + return true; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new SnapshotDefragTask()); + return queue; + } + + /** + * Returns true if the service run conditions are satisfied, false otherwise. + */ + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + if (ozoneManager.getOmRatisServer() == null) { + LOG.warn("OzoneManagerRatisServer is not initialized yet"); + return false; + } + // The service only runs if current OM node is ready + return running.get() && ozoneManager.isRunning(); + } + + public AtomicLong getSnapshotsDefraggedCount() { + return snapshotsDefraggedCount; + } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } + + @Override + public void shutdown() { + running.set(false); + super.shutdown(); + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 21c2b5979a72..385e85a3f13d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -1193,6 +1193,7 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK); LOG.debug("Calling RocksDBCheckpointDiffer"); try { + // TODO: Use this deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir).map(HashSet::new); } catch (Exception exception) { recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); @@ -1496,7 +1497,7 @@ long generateDiffReport( */ private boolean isKeyModified(OmKeyInfo fromKey, OmKeyInfo toKey) { return !fromKey.isKeyInfoSame(toKey, - false, false, false, false, true) + false, false, true, false, true) || !SnapshotUtils.isBlockLocationInfoSame(fromKey, toKey); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java index 7deeef51161c..4dca6c062de0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java @@ -45,7 +45,8 @@ public enum OMLayoutFeature implements LayoutFeature { QUOTA(6, "Ozone quota re-calculate"), HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"), - DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"); + DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"), + SNAPSHOT_DEFRAGMENTATION(9, "Snapshot defragmentation"); /////////////////////////////// ///////////////////////////// // Example OM Layout Feature with Actions diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index bea7785bfbc2..ed0179aa30d5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -152,6 +152,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSafeModeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSafeModeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDefragRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDefragResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; @@ -260,6 +262,11 @@ public OMResponse handleReadRequest(OMRequest request) { request.getRangerBGSyncRequest()); responseBuilder.setRangerBGSyncResponse(rangerBGSyncResponse); break; + case SnapshotDefrag: + SnapshotDefragResponse snapshotDefragResponse = triggerSnapshotDefrag( + request.getSnapshotDefragRequest()); + responseBuilder.setSnapshotDefragResponse(snapshotDefragResponse); + break; case DBUpdates: DBUpdatesResponse dbUpdatesResponse = getOMDBUpdates( request.getDbUpdatesRequest()); @@ -1077,6 +1084,14 @@ private RangerBGSyncResponse triggerRangerBGSync( return RangerBGSyncResponse.newBuilder().setRunSuccess(res).build(); } + private SnapshotDefragResponse triggerSnapshotDefrag( + SnapshotDefragRequest snapshotDefragRequest) throws IOException { + + boolean res = impl.triggerSnapshotDefrag(snapshotDefragRequest.getNoWait()); + + return SnapshotDefragResponse.newBuilder().setRunSuccess(res).build(); + } + private RefetchSecretKeyResponse refetchSecretKey() { UUID uuid = impl.refetchSecretKey(); RefetchSecretKeyResponse response = diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java index 4ab4996961ef..7283cc4f3a33 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java @@ -77,15 +77,15 @@ private File writeToYaml(String snapshotName) throws IOException { dataYaml.setSstFiltered(true); // Set last compaction time - dataYaml.setLastCompactionTime(NOW.toEpochMilli()); + dataYaml.setLastDefragTime(NOW.toEpochMilli()); // Set needs compaction flag - dataYaml.setNeedsCompaction(true); + dataYaml.setNeedsDefragmentation(true); // Add some compacted SST files - dataYaml.addCompactedSSTFileList(1, "table1", Collections.singleton("compacted-sst1")); - dataYaml.addCompactedSSTFileList(1, "table2", Collections.singleton("compacted-sst2")); - dataYaml.addCompactedSSTFileList(2, "table1", Collections.singleton("compacted-sst3")); + dataYaml.addDefraggedSSTFileList(1, "table1", Collections.singleton("compacted-sst1")); + dataYaml.addDefraggedSSTFileList(1, "table2", Collections.singleton("compacted-sst2")); + dataYaml.addDefraggedSSTFileList(2, "table1", Collections.singleton("compacted-sst3")); File yamlFile = new File(testRoot, yamlFilePath); @@ -109,7 +109,7 @@ public void testWriteToYaml() throws IOException { assertEquals(42, snapshotData.getVersion()); assertTrue(snapshotData.getSstFiltered()); - Map> uncompactedFiles = snapshotData.getUncompactedSSTFileList(); + Map> uncompactedFiles = snapshotData.getNotDefraggedSSTFileList(); assertEquals(2, uncompactedFiles.size()); assertEquals(2, uncompactedFiles.get("table1").size()); assertEquals(1, uncompactedFiles.get("table2").size()); @@ -117,10 +117,10 @@ public void testWriteToYaml() throws IOException { assertTrue(uncompactedFiles.get("table1").contains("sst2")); assertTrue(uncompactedFiles.get("table2").contains("sst3")); - assertEquals(NOW.toEpochMilli(), snapshotData.getLastCompactionTime()); - assertTrue(snapshotData.getNeedsCompaction()); + assertEquals(NOW.toEpochMilli(), snapshotData.getLastDefragTime()); + assertTrue(snapshotData.getNeedsDefragmentation()); - Map>> compactedFiles = snapshotData.getCompactedSSTFileList(); + Map>> compactedFiles = snapshotData.getDefraggedSSTFileList(); assertEquals(2, compactedFiles.size()); assertTrue(compactedFiles.containsKey(1)); assertTrue(compactedFiles.containsKey(2)); @@ -131,6 +131,44 @@ public void testWriteToYaml() throws IOException { assertTrue(compactedFiles.get(2).get("table1").contains("compacted-sst3")); } + private void setValuesToYaml(File yamlFile) throws IOException { + // Read from YAML file + OmSnapshotLocalDataYaml snapshotData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile); + + // Update version + snapshotData.setVersion(1); + // Update SST filtered flag +// snapshotData.setSstFiltered(false); + // Update last defrag time + snapshotData.setLastDefragTime(-1L); + // Update needs defragmentation flag + snapshotData.setNeedsDefragmentation(true); + + // Write updated data back to file + snapshotData.writeToYaml(yamlFile); + + // Read back the updated data + snapshotData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile); + + // Verify updated data + assertEquals(1, snapshotData.getVersion()); + assertEquals(-1, snapshotData.getLastDefragTime()); + assertTrue(snapshotData.getNeedsDefragmentation()); + } + + @Test + public void testUpdateYaml() throws IOException { + setValuesToYaml(new File("../../hadoop-ozone/dist/target/" + + "data-om/metadata/db.snapshots/checkpointState/" + + "om.db-d39279ce-cab6-44e0-839a-2baecb8c283a.yaml")); + setValuesToYaml(new File("../../hadoop-ozone/dist/target/" + + "data-om/metadata/db.snapshots/checkpointState/" + + "om.db-77b75627-5534-4db4-88e5-1661aceae92f.yaml")); + setValuesToYaml(new File("../../hadoop-ozone/dist/target/" + + "data-om/metadata/db.snapshots/checkpointState/" + + "om.db-6639d124-6615-4ced-9af6-3dabd680727b.yaml")); + } + @Test public void testUpdateSnapshotDataFile() throws IOException { File yamlFile = writeToYaml("snapshot2"); @@ -141,9 +179,9 @@ public void testUpdateSnapshotDataFile() throws IOException { // Update snapshot data dataYaml.setSstFiltered(false); - dataYaml.setNeedsCompaction(false); - dataYaml.addUncompactedSSTFileList("table3", Collections.singleton("sst4")); - dataYaml.addCompactedSSTFileList(3, "table3", Collections.singleton("compacted-sst4")); + dataYaml.setNeedsDefragmentation(false); + dataYaml.addNotDefraggedSSTFileList("table3", Collections.singleton("sst4")); + dataYaml.addDefraggedSSTFileList(3, "table3", Collections.singleton("compacted-sst4")); // Write updated data back to file dataYaml.writeToYaml(yamlFile); @@ -153,14 +191,14 @@ public void testUpdateSnapshotDataFile() throws IOException { // Verify updated data assertThat(dataYaml.getSstFiltered()).isFalse(); - assertThat(dataYaml.getNeedsCompaction()).isFalse(); - - Map> uncompactedFiles = dataYaml.getUncompactedSSTFileList(); + assertThat(dataYaml.getNeedsDefragmentation()).isFalse(); + // TODO: Fix variable names + Map> uncompactedFiles = dataYaml.getNotDefraggedSSTFileList(); assertEquals(3, uncompactedFiles.size()); assertTrue(uncompactedFiles.containsKey("table3")); assertTrue(uncompactedFiles.get("table3").contains("sst4")); - - Map>> compactedFiles = dataYaml.getCompactedSSTFileList(); + // TODO: Fix variable names + Map>> compactedFiles = dataYaml.getDefraggedSSTFileList(); assertEquals(3, compactedFiles.size()); assertTrue(compactedFiles.containsKey(3)); assertTrue(compactedFiles.get(3).containsKey("table3")); @@ -204,9 +242,9 @@ public void testYamlContainsAllFields() throws IOException { assertThat(content).contains(OzoneConsts.OM_SLD_VERSION); assertThat(content).contains(OzoneConsts.OM_SLD_CHECKSUM); assertThat(content).contains(OzoneConsts.OM_SLD_IS_SST_FILTERED); - assertThat(content).contains(OzoneConsts.OM_SLD_UNCOMPACTED_SST_FILE_LIST); - assertThat(content).contains(OzoneConsts.OM_SLD_LAST_COMPACTION_TIME); - assertThat(content).contains(OzoneConsts.OM_SLD_NEEDS_COMPACTION); - assertThat(content).contains(OzoneConsts.OM_SLD_COMPACTED_SST_FILE_LIST); + assertThat(content).contains(OzoneConsts.OM_SLD_NOT_DEFRAGGED_SST_FILE_LIST); + assertThat(content).contains(OzoneConsts.OM_SLD_LAST_DEFRAG_TIME); + assertThat(content).contains(OzoneConsts.OM_SLD_NEEDS_DEFRAGMENTATION); + assertThat(content).contains(OzoneConsts.OM_SLD_DEFRAGGED_SST_FILE_LIST); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java index f2006deac31e..ba1da617ff6a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java @@ -311,11 +311,11 @@ public void testCreateNewSnapshotLocalYaml() throws IOException { OmSnapshotLocalData localData = OmSnapshotLocalDataYaml.getFromYamlFile(snapshotYaml.toFile()); assertNotNull(localData); assertEquals(0, localData.getVersion()); - assertEquals(expUncompactedSSTFileList, localData.getUncompactedSSTFileList()); + assertEquals(expUncompactedSSTFileList, localData.getNotDefraggedSSTFileList()); assertFalse(localData.getSstFiltered()); - assertEquals(0L, localData.getLastCompactionTime()); - assertFalse(localData.getNeedsCompaction()); - assertTrue(localData.getCompactedSSTFileList().isEmpty()); + assertEquals(0L, localData.getLastDefragTime()); + assertFalse(localData.getNeedsDefragmentation()); + assertTrue(localData.getDefraggedSSTFileList().isEmpty()); // Cleanup Files.delete(snapshotYaml);