From 3f0a64134d1440fb433eaec03173f5447890c8ba Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 22 Jul 2024 13:06:14 +0530 Subject: [PATCH 1/7] HDDS-11284. refactor quota repair --- .../apache/hadoop/hdds/utils/db/DBStore.java | 8 + .../apache/hadoop/hdds/utils/db/RDBStore.java | 8 + .../ozone/om/service/QuotaRepairTask.java | 224 ++++++++++-------- .../om/upgrade/QuotaRepairUpgradeAction.java | 3 +- .../ozone/om/service/TestQuotaRepairTask.java | 4 +- 5 files changed, 143 insertions(+), 104 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 19855625237f..28f561bca87c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -167,6 +167,14 @@ void move(KEY sourceKey, KEY destKey, VALUE value, */ DBCheckpoint getCheckpoint(boolean flush) throws IOException; + /** + * Get current snapshot of DB store as an artifact stored on + * the local filesystem with relative path. + * @return An object that encapsulates the checkpoint information along with + * location. + */ + DBCheckpoint getCheckpoint(String relatedPath, boolean flush) throws IOException; + /** * Get DB Store location. * @return DB file location. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index d5aa961b0e9e..a7dd16a5307d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -338,6 +338,14 @@ public DBCheckpoint getCheckpoint(boolean flush) throws IOException { return checkPointManager.createCheckpoint(checkpointsParentDir); } + @Override + public DBCheckpoint getCheckpoint(String relativePath, boolean flush) throws IOException { + if (flush) { + this.flushDB(); + } + return checkPointManager.createCheckpoint(checkpointsParentDir, relativePath); + } + public DBCheckpoint getSnapshot(String name) throws IOException { this.flushLog(true); return checkPointManager.createCheckpoint(snapshotsParentDir, name); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java index 8a8ebd06f4f3..205f420e8a2f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java @@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.om.service; import com.google.common.util.concurrent.UncheckedExecutionException; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,12 +38,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -65,76 +72,127 @@ public class QuotaRepairTask { private static final int BATCH_SIZE = 5000; private static final int TASK_THREAD_CNT = 3; public static final long EPOCH_DEFAULT = -1L; - private final OMMetadataManager metadataManager; - private final Map nameBucketInfoMap = new HashMap<>(); - private final Map idBucketInfoMap = new HashMap<>(); + private final OzoneManager om; private ExecutorService executor; - private final Map keyCountMap = new ConcurrentHashMap<>(); - private final Map fileCountMap - = new ConcurrentHashMap<>(); - private final Map directoryCountMap - = new ConcurrentHashMap<>(); - private final Map oldVolumeKeyNameMap = new HashMap(); - - public QuotaRepairTask(OMMetadataManager metadataManager) { - this.metadataManager = metadataManager; + public QuotaRepairTask(OzoneManager ozoneManager) { + this.om = ozoneManager; } - + public void repair() throws Exception { LOG.info("Starting quota repair task"); - prepareAllVolumeBucketInfo(); + try { + // thread pool with 3 Table type * (1 task each + 3 thread each) + executor = Executors.newFixedThreadPool(12); + + // repair active db + OMMetadataManager activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration()); + repairActiveDb(activeMetaManager); + // list all snapshot + // repair snapshot dbs + updateOldVolumeQuotaSupport(om.getMetadataManager()); + } finally { + LOG.info("Completed quota repair task"); + executor.shutdown(); + } + } + + private void repairActiveDb(OMMetadataManager metadataManager) throws Exception { + Map nameBucketInfoMap = new HashMap<>(); + Map idBucketInfoMap = new HashMap<>(); + Map oriBucketInfoMap = new HashMap<>(); + prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager); + + repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager); + + // update bucket count + updateOldBucketQuotaSupport(metadataManager, nameBucketInfoMap); IOzoneManagerLock lock = metadataManager.getLock(); - // thread pool with 3 Table type * (1 task each + 3 thread each) - executor = Executors.newFixedThreadPool(12); try { nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock( BUCKET_LOCK, e.getVolumeName(), e.getBucketName())); - repairCount(); + try (BatchOperation batchOperation = metadataManager.getStore().initBatchOperation()) { + for (Map.Entry entry : nameBucketInfoMap.entrySet()) { + if (!isChange(oriBucketInfoMap.get(entry.getKey()), entry.getValue())) { + continue; + } + String bucketKey = metadataManager.getBucketKey(entry.getValue().getVolumeName(), + entry.getValue().getBucketName()); + metadataManager.getBucketTable().putWithBatch(batchOperation, bucketKey, entry.getValue()); + metadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey), + CacheValue.get(EPOCH_DEFAULT, entry.getValue())); + } + metadataManager.getStore().commitBatchOperation(batchOperation); + } } finally { nameBucketInfoMap.values().stream().forEach(e -> lock.releaseReadLock( BUCKET_LOCK, e.getVolumeName(), e.getBucketName())); - executor.shutdown(); - LOG.info("Completed quota repair task"); } - updateOldVolumeQuotaSupport(); - - // cleanup epoch added to avoid extra epoch id in cache - ArrayList epochs = new ArrayList<>(); - epochs.add(EPOCH_DEFAULT); - metadataManager.getBucketTable().cleanupCache(epochs); - metadataManager.getVolumeTable().cleanupCache(epochs); } - private void prepareAllVolumeBucketInfo() throws IOException { - try (TableIterator> - iterator = metadataManager.getVolumeTable().iterator()) { + private OMMetadataManager createActiveDBCheckpoint( + OMMetadataManager omMetaManager, OzoneConfiguration conf) throws IOException { + // cleanup + File dbLocation = omMetaManager.getStore().getDbLocation(); + if (dbLocation == null) { + throw new NullPointerException("db location is null"); + } + String tempData = dbLocation.getParent(); + if (tempData == null) { + throw new NullPointerException("parent db dir is null"); + } + File repairTmpPath = Paths.get(tempData, "temp-repair-quota").toFile(); + FileUtils.deleteDirectory(repairTmpPath); + + // create snapshot + DBCheckpoint checkpoint = omMetaManager.getStore().getCheckpoint("temp-repair-quota", true); + return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, checkpoint); + } - OmVolumeArgs omVolumeArgs; + private void prepareAllBucketInfo( + Map nameBucketInfoMap, Map idBucketInfoMap, + Map oriBucketInfoMap, OMMetadataManager metadataManager) throws IOException { + try (TableIterator> + iterator = metadataManager.getBucketTable().iterator()) { while (iterator.hasNext()) { - Table.KeyValue entry = - iterator.next(); - omVolumeArgs = entry.getValue(); - getAllBuckets(omVolumeArgs.getVolume(), omVolumeArgs.getObjectID()); - if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT - || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { - oldVolumeKeyNameMap.put(entry.getKey(), entry.getValue().getVolume()); - } + Table.KeyValue entry = iterator.next(); + OmBucketInfo bucketInfo = entry.getValue(); + String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(), + bucketInfo.getBucketName()); + oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject()); + bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace()); + bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes()); + nameBucketInfoMap.put(bucketNameKey, bucketInfo); + idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()), + bucketInfo.getObjectID()), bucketInfo); } } } - private void updateOldVolumeQuotaSupport() throws IOException { + private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) { + if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace() + || lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes() + || lBucketInfo.getQuotaInNamespace() != rBucketInfo.getQuotaInNamespace() + || lBucketInfo.getQuotaInBytes() != rBucketInfo.getQuotaInBytes()) { + return true; + } + return false; + } + + private static void updateOldVolumeQuotaSupport(OMMetadataManager metadataManager) throws IOException { LOG.info("Starting volume quota support update"); IOzoneManagerLock lock = metadataManager.getLock(); - try (BatchOperation batchOperation = metadataManager.getStore() - .initBatchOperation()) { - for (Map.Entry volEntry - : oldVolumeKeyNameMap.entrySet()) { - lock.acquireReadLock(VOLUME_LOCK, volEntry.getValue()); + try (TableIterator> + iterator = metadataManager.getVolumeTable().iterator()) { + while (iterator.hasNext()) { + Table.KeyValue entry = iterator.next(); + OmVolumeArgs omVolumeArgs = entry.getValue(); + if (!(omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT + || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT)) { + continue; + } try { - OmVolumeArgs omVolumeArgs = metadataManager.getVolumeTable().get( - volEntry.getKey()); + lock.acquireReadLock(VOLUME_LOCK, omVolumeArgs.getVolume()); boolean isQuotaReset = false; if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) { omVolumeArgs.setQuotaInBytes(QUOTA_RESET); @@ -146,35 +204,18 @@ private void updateOldVolumeQuotaSupport() throws IOException { } if (isQuotaReset) { metadataManager.getVolumeTable().addCacheEntry( - new CacheKey<>(volEntry.getKey()), - CacheValue.get(EPOCH_DEFAULT, omVolumeArgs)); - metadataManager.getVolumeTable().putWithBatch(batchOperation, - volEntry.getKey(), omVolumeArgs); + new CacheKey<>(entry.getKey()), CacheValue.get(EPOCH_DEFAULT, omVolumeArgs)); + metadataManager.getVolumeTable().put(entry.getKey(), omVolumeArgs); } } finally { - lock.releaseReadLock(VOLUME_LOCK, volEntry.getValue()); + lock.releaseReadLock(VOLUME_LOCK, omVolumeArgs.getVolume()); } } - metadataManager.getStore().commitBatchOperation(batchOperation); } LOG.info("Completed volume quota support update"); } - private void getAllBuckets(String volumeName, long volumeId) - throws IOException { - List bucketList = metadataManager.listBuckets( - volumeName, null, null, Integer.MAX_VALUE, false); - for (OmBucketInfo bucketInfo : bucketList) { - bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace()); - bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes()); - nameBucketInfoMap.put(buildNamePath(volumeName, - bucketInfo.getBucketName()), bucketInfo); - idBucketInfoMap.put(buildIdPath(volumeId, bucketInfo.getObjectID()), - bucketInfo); - } - } - - private String buildNamePath(String volumeName, String bucketName) { + private static String buildNamePath(String volumeName, String bucketName) { final StringBuilder builder = new StringBuilder(); builder.append(OM_KEY_PREFIX) .append(volumeName) @@ -184,7 +225,7 @@ private String buildNamePath(String volumeName, String bucketName) { return builder.toString(); } - private String buildIdPath(long volumeId, long bucketId) { + private static String buildIdPath(long volumeId, long bucketId) { final StringBuilder builder = new StringBuilder(); builder.append(OM_KEY_PREFIX) .append(volumeId) @@ -194,8 +235,13 @@ private String buildIdPath(long volumeId, long bucketId) { return builder.toString(); } - private void repairCount() throws Exception { - LOG.info("Starting quota repair for all keys, files and directories"); + private void repairCount( + Map nameBucketInfoMap, Map idBucketInfoMap, + OMMetadataManager metadataManager) throws Exception { + LOG.info("Starting quota repair counting for all keys, files and directories"); + Map keyCountMap = new ConcurrentHashMap<>(); + Map fileCountMap = new ConcurrentHashMap<>(); + Map directoryCountMap = new ConcurrentHashMap<>(); try { nameBucketInfoMap.keySet().stream().forEach(e -> keyCountMap.put(e, new CountPair())); @@ -225,30 +271,15 @@ private void repairCount() throws Exception { throw new Exception(ex.getCause()); } - // persist bucket info + // update count to bucket info updateCountToBucketInfo(nameBucketInfoMap, keyCountMap); updateCountToBucketInfo(idBucketInfoMap, fileCountMap); updateCountToBucketInfo(idBucketInfoMap, directoryCountMap); - - // update quota enable flag for old volume and buckets - updateOldBucketQuotaSupport(); - - try (BatchOperation batchOperation = metadataManager.getStore() - .initBatchOperation()) { - for (Map.Entry entry - : nameBucketInfoMap.entrySet()) { - String bucketKey = metadataManager.getBucketKey( - entry.getValue().getVolumeName(), - entry.getValue().getBucketName()); - metadataManager.getBucketTable().putWithBatch(batchOperation, - bucketKey, entry.getValue()); - } - metadataManager.getStore().commitBatchOperation(batchOperation); - } - LOG.info("Completed quota repair for all keys, files and directories"); + LOG.info("Completed quota repair counting for all keys, files and directories"); } - private void updateOldBucketQuotaSupport() { + private static void updateOldBucketQuotaSupport( + OMMetadataManager metadataManager, Map nameBucketInfoMap) { for (Map.Entry entry : nameBucketInfoMap.entrySet()) { if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT || entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { @@ -261,13 +292,6 @@ private void updateOldBucketQuotaSupport() { } OmBucketInfo bucketInfo = builder.build(); entry.setValue(bucketInfo); - - // there is a new value to be updated in bucket cache - String bucketKey = metadataManager.getBucketKey( - bucketInfo.getVolumeName(), bucketInfo.getBucketName()); - metadataManager.getBucketTable().addCacheEntry( - new CacheKey<>(bucketKey), - CacheValue.get(EPOCH_DEFAULT, bucketInfo)); } } } @@ -315,7 +339,7 @@ private void recalculateUsages( } } - private void captureCount( + private static void captureCount( Map prefixUsageMap, BlockingQueue>> q, AtomicBoolean isRunning, boolean haveValue) throws UncheckedIOException { @@ -334,7 +358,7 @@ private void captureCount( } } - private void extractCount( + private static void extractCount( Table.KeyValue kv, Map prefixUsageMap, boolean haveValue) { @@ -357,7 +381,7 @@ private void extractCount( } } - private synchronized void updateCountToBucketInfo( + private static synchronized void updateCountToBucketInfo( Map bucketInfoMap, Map prefixUsageMap) { for (Map.Entry entry : prefixUsageMap.entrySet()) { @@ -370,7 +394,7 @@ private synchronized void updateCountToBucketInfo( } } - private String getVolumeBucketPrefix(String key) { + private static String getVolumeBucketPrefix(String key) { // get bucket prefix with /// // -- as represents name in OBS and id in FSO String prefix = key; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java index 8a2e4f550e7d..abebfe7e03a1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java @@ -38,8 +38,7 @@ public void execute(OzoneManager arg) throws Exception { OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE, OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE_DEFAULT); if (enabled) { - QuotaRepairTask quotaRepairTask = new QuotaRepairTask( - arg.getMetadataManager()); + QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); quotaRepairTask.repair(); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java index 1a0db1183311..058aa617d4bd 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java @@ -88,7 +88,7 @@ public void testQuotaRepair() throws Exception { assertEquals(0, fsoBucketInfo.getUsedNamespace()); assertEquals(0, fsoBucketInfo.getUsedBytes()); - QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager); + QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager); quotaRepairTask.repair(); // 10 files of each type, obs have replication of three and @@ -131,7 +131,7 @@ public void testQuotaRepairForOldVersionVolumeBucket() throws Exception { assertEquals(-2, omVolumeArgs.getQuotaInBytes()); assertEquals(-2, omVolumeArgs.getQuotaInNamespace()); - QuotaRepairTask quotaRepairTask = new QuotaRepairTask(omMetadataManager); + QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager); quotaRepairTask.repair(); bucketInfo = omMetadataManager.getBucketTable().get( From 8cc8e485c60599e755fe8cf1c50af934b99c1cdb Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 5 Aug 2024 20:39:49 +0530 Subject: [PATCH 2/7] update quota repair run parallel --- .../apache/hadoop/hdds/utils/db/DBStore.java | 4 +- .../apache/hadoop/hdds/utils/db/RDBStore.java | 4 +- .../java/org/apache/hadoop/ozone/OmUtils.java | 1 + .../src/main/proto/OmClientProtocol.proto | 21 +- .../ratis/utils/OzoneManagerRatisUtils.java | 3 + .../request/volume/OMQuotaRepairRequest.java | 173 +++++++++++ .../volume/OMQuotaRepairResponse.java | 72 +++++ .../ozone/om/service/QuotaRepairTask.java | 282 +++++++++++------- .../om/upgrade/QuotaRepairUpgradeAction.java | 8 +- .../ozone/om/service/TestQuotaRepairTask.java | 66 +++- 10 files changed, 514 insertions(+), 120 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 28f561bca87c..3e8ea30a6528 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -169,11 +169,11 @@ void move(KEY sourceKey, KEY destKey, VALUE value, /** * Get current snapshot of DB store as an artifact stored on - * the local filesystem with relative path. + * the local filesystem with different parent path. * @return An object that encapsulates the checkpoint information along with * location. */ - DBCheckpoint getCheckpoint(String relatedPath, boolean flush) throws IOException; + DBCheckpoint getCheckpoint(String parentDir, boolean flush) throws IOException; /** * Get DB Store location. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index a7dd16a5307d..99924f724d54 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -339,11 +339,11 @@ public DBCheckpoint getCheckpoint(boolean flush) throws IOException { } @Override - public DBCheckpoint getCheckpoint(String relativePath, boolean flush) throws IOException { + public DBCheckpoint getCheckpoint(String parentPath, boolean flush) throws IOException { if (flush) { this.flushDB(); } - return checkPointManager.createCheckpoint(checkpointsParentDir, relativePath); + return checkPointManager.createCheckpoint(parentPath, null); } public DBCheckpoint getSnapshot(String name) throws IOException { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 82030669c93a..bf27d7afb61a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -327,6 +327,7 @@ public static boolean isReadOnly( case SetTimes: case AbortExpiredMultiPartUploads: case SetSnapshotProperty: + case QuotaRepair: case UnknownCommand: return false; case EchoRPC: diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 32bba266080a..4bdfa97b935e 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -147,8 +147,8 @@ enum Type { ListStatusLight = 129; GetSnapshotInfo = 130; RenameSnapshot = 131; - ListOpenFiles = 132; + QuotaRepair = 133; } enum SafeMode { @@ -285,8 +285,8 @@ message OMRequest { optional SetSnapshotPropertyRequest SetSnapshotPropertyRequest = 127; optional SnapshotInfoRequest SnapshotInfoRequest = 128; optional RenameSnapshotRequest RenameSnapshotRequest = 129; - optional ListOpenFilesRequest ListOpenFilesRequest = 130; + optional QuotaRepairRequest QuotaRepairRequest = 131; } message OMResponse { @@ -410,8 +410,8 @@ message OMResponse { optional SnapshotInfoResponse SnapshotInfoResponse = 130; optional OMLockDetailsProto omLockDetails = 131; optional RenameSnapshotResponse RenameSnapshotResponse = 132; - optional ListOpenFilesResponse ListOpenFilesResponse = 133; + optional QuotaRepairResponse QuotaRepairResponse = 134; } enum Status { @@ -2187,6 +2187,21 @@ message SetSafeModeResponse { optional bool response = 1; } +message QuotaRepairRequest { + repeated BucketQuotaCount bucketCount = 1; + required bool supportVolumeOldQuota = 2 [default=false]; +} +message BucketQuotaCount { + required string volName = 1; + required string bucketName = 2; + required int64 diffUsedBytes = 3; + required int64 diffUsedNamespace = 4; + required bool supportOldQuota = 5 [default=false]; +} + +message QuotaRepairResponse { +} + message OMLockDetailsProto { optional bool isLockAcquired = 1; optional uint64 waitLockNanos = 2; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 8ff59e091d88..5dc640c742cc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -84,6 +84,7 @@ import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest; import org.apache.hadoop.ozone.om.request.upgrade.OMPrepareRequest; import org.apache.hadoop.ozone.om.request.util.OMEchoRPCWriteRequest; +import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest; @@ -331,6 +332,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest, return new OMEchoRPCWriteRequest(omRequest); case AbortExpiredMultiPartUploads: return new S3ExpiredMultipartUploadsAbortRequest(omRequest); + case QuotaRepair: + return new OMQuotaRepairRequest(omRequest); default: throw new OMException("Unrecognized write command type request " + cmdType, OMException.ResultCodes.INVALID_REQUEST); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java new file mode 100644 index 000000000000..e673ecb4bf76 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.volume; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.ratis.server.protocol.TermIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK; + +/** + * Handle OMQuotaRepairRequest Request. + */ +public class OMQuotaRepairRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMQuotaRepairRequest.class); + + public OMQuotaRepairRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + @SuppressWarnings("methodlength") + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) { + final long transactionLogIndex = termIndex.getIndex(); + OzoneManagerProtocolProtos.QuotaRepairRequest quotaRepairRequest = + getOmRequest().getQuotaRepairRequest(); + Preconditions.checkNotNull(quotaRepairRequest); + + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest()); + Map, OmBucketInfo> bucketMap = new HashMap<>(); + OMClientResponse omClientResponse = null; + try { + for (int i = 0; i < quotaRepairRequest.getBucketCountCount(); ++i) { + OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo = quotaRepairRequest.getBucketCount(i); + updateBucketInfo(omMetadataManager, bucketCountInfo, transactionLogIndex, bucketMap); + } + Map volUpdateMap = updateOldVolumeQuotaSupport(omMetadataManager, transactionLogIndex); + omResponse.setQuotaRepairResponse( + OzoneManagerProtocolProtos.QuotaRepairResponse.newBuilder().build()); + omClientResponse = new OMQuotaRepairResponse(omResponse.build(), volUpdateMap, bucketMap); + } catch (IOException ex) { + LOG.error("failed to update repair count", ex); + omClientResponse = new OMQuotaRepairResponse(createErrorOMResponse(omResponse, ex)); + } finally { + if (omClientResponse != null) { + omClientResponse.setOmLockDetails(getOmLockDetails()); + } + } + + return omClientResponse; + } + + private void updateBucketInfo( + OMMetadataManager omMetadataManager, OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo, + long transactionLogIndex, Map, OmBucketInfo> bucketMap) throws IOException { + // acquire lock. + mergeOmLockDetails(omMetadataManager.getLock().acquireWriteLock( + BUCKET_LOCK, bucketCountInfo.getVolName(), bucketCountInfo.getBucketName())); + boolean acquiredBucketLock = getOmLockDetails().isLockAcquired(); + try { + String bucketKey = omMetadataManager.getBucketKey(bucketCountInfo.getVolName(), + bucketCountInfo.getBucketName()); + OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(bucketKey); + if (null == bucketInfo) { + // bucket might be deleted when running repair count parallel + return; + } + bucketInfo.incrUsedBytes(bucketCountInfo.getDiffUsedBytes()); + bucketInfo.incrUsedNamespace(bucketCountInfo.getDiffUsedNamespace()); + if (bucketCountInfo.getSupportOldQuota()) { + OmBucketInfo.Builder builder = bucketInfo.toBuilder(); + if (bucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT) { + builder.setQuotaInBytes(QUOTA_RESET); + } + if (bucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { + builder.setQuotaInNamespace(QUOTA_RESET); + } + bucketInfo = builder.build(); + } + + omMetadataManager.getBucketTable().addCacheEntry( + new CacheKey<>(bucketKey), CacheValue.get(transactionLogIndex, bucketInfo)); + bucketMap.put(Pair.of(bucketCountInfo.getVolName(), bucketCountInfo.getBucketName()), bucketInfo); + } finally { + if (acquiredBucketLock) { + mergeOmLockDetails(omMetadataManager.getLock() + .releaseWriteLock(BUCKET_LOCK, bucketCountInfo.getVolName(), bucketCountInfo.getBucketName())); + } + } + } + + private Map updateOldVolumeQuotaSupport( + OMMetadataManager metadataManager, long transactionLogIndex) throws IOException { + LOG.info("Starting volume quota support update"); + Map volUpdateMap = new HashMap<>(); + try (TableIterator> + iterator = metadataManager.getVolumeTable().iterator()) { + while (iterator.hasNext()) { + Table.KeyValue entry = iterator.next(); + OmVolumeArgs omVolumeArgs = entry.getValue(); + if (!(omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT + || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT)) { + continue; + } + mergeOmLockDetails(metadataManager.getLock().acquireWriteLock( + VOLUME_LOCK, omVolumeArgs.getVolume())); + boolean acquiredVolumeLock = getOmLockDetails().isLockAcquired(); + try { + boolean isQuotaReset = false; + if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) { + omVolumeArgs.setQuotaInBytes(QUOTA_RESET); + isQuotaReset = true; + } + if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { + omVolumeArgs.setQuotaInNamespace(QUOTA_RESET); + isQuotaReset = true; + } + if (isQuotaReset) { + metadataManager.getVolumeTable().addCacheEntry( + new CacheKey<>(entry.getKey()), CacheValue.get(transactionLogIndex, omVolumeArgs)); + volUpdateMap.put(entry.getKey(), omVolumeArgs); + } + } finally { + if (acquiredVolumeLock) { + mergeOmLockDetails(metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, omVolumeArgs.getVolume())); + } + } + } + } + LOG.info("Completed volume quota support update for volume count {}", volUpdateMap.size()); + return volUpdateMap; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java new file mode 100644 index 000000000000..8fa028d74387 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMQuotaRepairResponse.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response.volume; + +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest; +import org.apache.hadoop.ozone.om.response.CleanupTableInfo; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE; + +/** + * Response for {@link OMQuotaRepairRequest} request. + */ +@CleanupTableInfo(cleanupTables = {VOLUME_TABLE, BUCKET_TABLE}) +public class OMQuotaRepairResponse extends OMClientResponse { + private Map volumeArgsMap; + private Map, OmBucketInfo> volBucketInfoMap; + + /** + * for quota failure response update. + */ + public OMQuotaRepairResponse(@Nonnull OMResponse omResponse) { + super(omResponse); + } + + public OMQuotaRepairResponse( + @Nonnull OMResponse omResponse, Map volumeArgsMap, + Map, OmBucketInfo> volBucketInfoMap) { + super(omResponse); + this.volBucketInfoMap = volBucketInfoMap; + this.volumeArgsMap = volumeArgsMap; + } + + @Override + public void addToDBBatch(OMMetadataManager metadataManager, + BatchOperation batchOp) throws IOException { + for (OmBucketInfo omBucketInfo : volBucketInfoMap.values()) { + metadataManager.getBucketTable().putWithBatch(batchOp, + metadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()), omBucketInfo); + } + for (OmVolumeArgs volArgs : volumeArgsMap.values()) { + metadataManager.getVolumeTable().putWithBatch(batchOp, volArgs.getVolume(), volArgs); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java index 205f420e8a2f..24e7fb424664 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.service; import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.protobuf.ServiceException; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -30,6 +31,7 @@ import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -40,28 +42,27 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.QUOTA_RESET; -import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; -import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK; /** * Quota repair task. @@ -71,33 +72,79 @@ public class QuotaRepairTask { QuotaRepairTask.class); private static final int BATCH_SIZE = 5000; private static final int TASK_THREAD_CNT = 3; - public static final long EPOCH_DEFAULT = -1L; + private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false); + private static final RepairStatus REPAIR_STATUS = new RepairStatus(); private final OzoneManager om; + private final AtomicLong runCount = new AtomicLong(0); private ExecutorService executor; public QuotaRepairTask(OzoneManager ozoneManager) { this.om = ozoneManager; } - public void repair() throws Exception { - LOG.info("Starting quota repair task"); + public CompletableFuture repair() throws Exception { + // lock in progress operation and reject any other + if (!IN_PROGRESS.compareAndSet(false, true)) { + LOG.info("quota repair task already running"); + return CompletableFuture.supplyAsync(() -> false); + } + REPAIR_STATUS.reset(runCount.get() + 1); + return CompletableFuture.supplyAsync(() -> repairTask()); + } + + public static String getStatus() { + return REPAIR_STATUS.toString(); + } + private boolean repairTask() { + LOG.info("Starting quota repair task {}", REPAIR_STATUS); + OMMetadataManager activeMetaManager = null; try { // thread pool with 3 Table type * (1 task each + 3 thread each) executor = Executors.newFixedThreadPool(12); - + OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder + = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder(); // repair active db - OMMetadataManager activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration()); - repairActiveDb(activeMetaManager); - // list all snapshot - // repair snapshot dbs + activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration()); + repairActiveDb(activeMetaManager, builder); + + // TODO: repair snapshots for quota - updateOldVolumeQuotaSupport(om.getMetadataManager()); + // submit request to update + ClientId clientId = ClientId.randomId(); + OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.QuotaRepair) + .setQuotaRepairRequest(builder.build()) + .setClientId(clientId.toString()) + .build(); + OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest, clientId); + if (response != null && response.getSuccess()) { + LOG.error("update quota repair count response failed"); + REPAIR_STATUS.updateStatus("Response for update DB is failed"); + } else { + REPAIR_STATUS.updateStatus(builder, om.getMetadataManager()); + } + } catch (Exception exp) { + LOG.error("quota repair count failed", exp); + REPAIR_STATUS.updateStatus(exp.toString()); + return false; } finally { - LOG.info("Completed quota repair task"); + LOG.info("Completed quota repair task {}", REPAIR_STATUS); executor.shutdown(); + try { + if (null != activeMetaManager) { + activeMetaManager.stop(); + } + cleanTempCheckPointPath(om.getMetadataManager()); + } catch (Exception exp) { + LOG.error("failed to cleanup", exp); + } + IN_PROGRESS.set(false); } + return true; } - private void repairActiveDb(OMMetadataManager metadataManager) throws Exception { + private void repairActiveDb( + OMMetadataManager metadataManager, + OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws Exception { Map nameBucketInfoMap = new HashMap<>(); Map idBucketInfoMap = new HashMap<>(); Map oriBucketInfoMap = new HashMap<>(); @@ -105,34 +152,65 @@ private void repairActiveDb(OMMetadataManager metadataManager) throws Exception repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager); - // update bucket count - updateOldBucketQuotaSupport(metadataManager, nameBucketInfoMap); - IOzoneManagerLock lock = metadataManager.getLock(); + // prepare and submit request to ratis + for (Map.Entry entry : nameBucketInfoMap.entrySet()) { + OmBucketInfo oriBucketInfo = oriBucketInfoMap.get(entry.getKey()); + OmBucketInfo updatedBuckedInfo = entry.getValue(); + boolean oldQuota = oriBucketInfo.getQuotaInBytes() == OLD_QUOTA_DEFAULT + || oriBucketInfo.getQuotaInNamespace() == OLD_QUOTA_DEFAULT; + if (!(oldQuota || isChange(oriBucketInfo, updatedBuckedInfo))) { + continue; + } + OzoneManagerProtocolProtos.BucketQuotaCount.Builder bucketCountBuilder + = OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder(); + bucketCountBuilder.setVolName(updatedBuckedInfo.getVolumeName()); + bucketCountBuilder.setBucketName(updatedBuckedInfo.getBucketName()); + bucketCountBuilder.setDiffUsedBytes(updatedBuckedInfo.getUsedBytes() - oriBucketInfo.getUsedBytes()); + bucketCountBuilder.setDiffUsedNamespace( + updatedBuckedInfo.getUsedNamespace() - oriBucketInfo.getUsedNamespace()); + bucketCountBuilder.setSupportOldQuota(oldQuota); + builder.addBucketCount(bucketCountBuilder.build()); + } + + // update volume to support quota + builder.setSupportVolumeOldQuota(true); + } + + private OzoneManagerProtocolProtos.OMResponse submitRequest( + OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) { try { - nameBucketInfoMap.values().stream().forEach(e -> lock.acquireReadLock( - BUCKET_LOCK, e.getVolumeName(), e.getBucketName())); - try (BatchOperation batchOperation = metadataManager.getStore().initBatchOperation()) { - for (Map.Entry entry : nameBucketInfoMap.entrySet()) { - if (!isChange(oriBucketInfoMap.get(entry.getKey()), entry.getValue())) { - continue; - } - String bucketKey = metadataManager.getBucketKey(entry.getValue().getVolumeName(), - entry.getValue().getBucketName()); - metadataManager.getBucketTable().putWithBatch(batchOperation, bucketKey, entry.getValue()); - metadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey), - CacheValue.get(EPOCH_DEFAULT, entry.getValue())); - } - metadataManager.getStore().commitBatchOperation(batchOperation); + if (om.isRatisEnabled()) { + OzoneManagerRatisServer server = om.getOmRatisServer(); + RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() + .setClientId(clientId) + .setServerId(om.getOmRatisServer().getRaftPeerId()) + .setGroupId(om.getOmRatisServer().getRaftGroupId()) + .setCallId(runCount.getAndIncrement()) + .setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); + return server.submitRequest(omRequest, raftClientRequest); + } else { + return om.getOmServerProtocol().submitRequest( + null, omRequest); } - } finally { - nameBucketInfoMap.values().stream().forEach(e -> lock.releaseReadLock( - BUCKET_LOCK, e.getVolumeName(), e.getBucketName())); + } catch (ServiceException e) { + LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e); } + return null; } private OMMetadataManager createActiveDBCheckpoint( OMMetadataManager omMetaManager, OzoneConfiguration conf) throws IOException { // cleanup + String parentPath = cleanTempCheckPointPath(omMetaManager); + + // create snapshot + DBCheckpoint checkpoint = omMetaManager.getStore().getCheckpoint(parentPath, true); + return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, checkpoint); + } + + private static String cleanTempCheckPointPath(OMMetadataManager omMetaManager) throws IOException { File dbLocation = omMetaManager.getStore().getDbLocation(); if (dbLocation == null) { throw new NullPointerException("db location is null"); @@ -143,10 +221,8 @@ private OMMetadataManager createActiveDBCheckpoint( } File repairTmpPath = Paths.get(tempData, "temp-repair-quota").toFile(); FileUtils.deleteDirectory(repairTmpPath); - - // create snapshot - DBCheckpoint checkpoint = omMetaManager.getStore().getCheckpoint("temp-repair-quota", true); - return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, checkpoint); + FileUtils.forceMkdir(repairTmpPath); + return repairTmpPath.toString(); } private void prepareAllBucketInfo( @@ -171,49 +247,11 @@ private void prepareAllBucketInfo( private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) { if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace() - || lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes() - || lBucketInfo.getQuotaInNamespace() != rBucketInfo.getQuotaInNamespace() - || lBucketInfo.getQuotaInBytes() != rBucketInfo.getQuotaInBytes()) { + || lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) { return true; } return false; } - - private static void updateOldVolumeQuotaSupport(OMMetadataManager metadataManager) throws IOException { - LOG.info("Starting volume quota support update"); - IOzoneManagerLock lock = metadataManager.getLock(); - try (TableIterator> - iterator = metadataManager.getVolumeTable().iterator()) { - while (iterator.hasNext()) { - Table.KeyValue entry = iterator.next(); - OmVolumeArgs omVolumeArgs = entry.getValue(); - if (!(omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT - || omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT)) { - continue; - } - try { - lock.acquireReadLock(VOLUME_LOCK, omVolumeArgs.getVolume()); - boolean isQuotaReset = false; - if (omVolumeArgs.getQuotaInBytes() == OLD_QUOTA_DEFAULT) { - omVolumeArgs.setQuotaInBytes(QUOTA_RESET); - isQuotaReset = true; - } - if (omVolumeArgs.getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { - omVolumeArgs.setQuotaInNamespace(QUOTA_RESET); - isQuotaReset = true; - } - if (isQuotaReset) { - metadataManager.getVolumeTable().addCacheEntry( - new CacheKey<>(entry.getKey()), CacheValue.get(EPOCH_DEFAULT, omVolumeArgs)); - metadataManager.getVolumeTable().put(entry.getKey(), omVolumeArgs); - } - } finally { - lock.releaseReadLock(VOLUME_LOCK, omVolumeArgs.getVolume()); - } - } - } - LOG.info("Completed volume quota support update"); - } private static String buildNamePath(String volumeName, String bucketName) { final StringBuilder builder = new StringBuilder(); @@ -278,24 +316,6 @@ private void repairCount( LOG.info("Completed quota repair counting for all keys, files and directories"); } - private static void updateOldBucketQuotaSupport( - OMMetadataManager metadataManager, Map nameBucketInfoMap) { - for (Map.Entry entry : nameBucketInfoMap.entrySet()) { - if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT - || entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { - OmBucketInfo.Builder builder = entry.getValue().toBuilder(); - if (entry.getValue().getQuotaInBytes() == OLD_QUOTA_DEFAULT) { - builder.setQuotaInBytes(QUOTA_RESET); - } - if (entry.getValue().getQuotaInNamespace() == OLD_QUOTA_DEFAULT) { - builder.setQuotaInNamespace(QUOTA_RESET); - } - OmBucketInfo bucketInfo = builder.build(); - entry.setValue(bucketInfo); - } - } - } - private void recalculateUsages( Table table, Map prefixUsageMap, String strType, boolean haveValue) throws UncheckedIOException, @@ -428,4 +448,66 @@ public long getNamespace() { return namespace.get(); } } + + /** + * Repair status for last run. + */ + public static class RepairStatus { + private boolean isTriggered = false; + private long taskId = 0; + private long lastRunStartTime = 0; + private long lastRunFinishedTime = 0; + private String errorMsg = null; + private Map> bucketCountDiffMap = new ConcurrentHashMap<>(); + + @Override + public String toString() { + if (!isTriggered) { + return "{}"; + } + Map status = new HashMap<>(); + status.put("taskId", taskId); + status.put("lastRunStartTime", lastRunStartTime); + status.put("lastRunFinishedTime", lastRunFinishedTime); + status.put("errorMsg", errorMsg); + status.put("bucketCountDiffMap", bucketCountDiffMap); + try { + return new ObjectMapper().writeValueAsString(status); + } catch (IOException e) { + LOG.error("error in generating status", e); + return "{}"; + } + } + + public void updateStatus(OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder, + OMMetadataManager metadataManager) { + isTriggered = true; + lastRunFinishedTime = System.currentTimeMillis(); + errorMsg = ""; + bucketCountDiffMap.clear(); + for (OzoneManagerProtocolProtos.BucketQuotaCount quotaCount : builder.getBucketCountList()) { + String bucketKey = metadataManager.getBucketKey(quotaCount.getVolName(), quotaCount.getBucketName()); + ConcurrentHashMap diffCountMap = new ConcurrentHashMap<>(); + diffCountMap.put("DiffUsedBytes", quotaCount.getDiffUsedBytes()); + diffCountMap.put("DiffUsedNamespace", quotaCount.getDiffUsedNamespace()); + bucketCountDiffMap.put(bucketKey, diffCountMap); + } + } + + public void updateStatus(String errMsg) { + isTriggered = true; + lastRunFinishedTime = System.currentTimeMillis(); + errorMsg = errMsg; + bucketCountDiffMap.clear(); + } + + public void reset(long tskId) { + isTriggered = true; + taskId = tskId; + lastRunStartTime = System.currentTimeMillis(); + lastRunFinishedTime = 0; + errorMsg = ""; + bucketCountDiffMap.clear(); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java index abebfe7e03a1..80e30406ef66 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java @@ -24,13 +24,12 @@ import org.apache.hadoop.ozone.om.service.QuotaRepairTask; import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.QUOTA; -import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FIRST_UPGRADE_START; +import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; /** - * Quota repair for usages action to be triggered during first upgrade. + * Quota repair for usages action to be triggered after upgrade. */ -@UpgradeActionOm(type = ON_FIRST_UPGRADE_START, feature = - QUOTA) +@UpgradeActionOm(type = ON_FINALIZE, feature = QUOTA) public class QuotaRepairUpgradeAction implements OmUpgradeAction { @Override public void execute(OzoneManager arg) throws Exception { @@ -38,6 +37,7 @@ public void execute(OzoneManager arg) throws Exception { OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE, OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE_DEFAULT); if (enabled) { + // just trigger quota repair and status can be checked via CLI QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); quotaRepairTask.repair(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java index 058aa617d4bd..06b8beacb39a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestQuotaRepairTask.java @@ -22,8 +22,16 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; import static org.junit.jupiter.api.Assertions.assertEquals; - +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -32,6 +40,11 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest; +import org.apache.hadoop.ozone.om.request.volume.OMQuotaRepairRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.util.Time; import org.junit.jupiter.api.Test; @@ -44,6 +57,16 @@ public class TestQuotaRepairTask extends TestOMKeyRequest { @Test public void testQuotaRepair() throws Exception { + when(ozoneManager.isRatisEnabled()).thenReturn(false); + OzoneManagerProtocolProtos.OMResponse respMock = mock(OzoneManagerProtocolProtos.OMResponse.class); + when(respMock.getSuccess()).thenReturn(true); + OzoneManagerProtocolServerSideTranslatorPB serverMock = mock(OzoneManagerProtocolServerSideTranslatorPB.class); + AtomicReference ref = new AtomicReference<>(); + doAnswer(invocation -> { + ref.set(invocation.getArgument(1, OzoneManagerProtocolProtos.OMRequest.class)); + return respMock; + }).when(serverMock).submitRequest(any(), any()); + when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock); OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, omMetadataManager, BucketLayout.OBJECT_STORE); @@ -89,8 +112,15 @@ public void testQuotaRepair() throws Exception { assertEquals(0, fsoBucketInfo.getUsedBytes()); QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager); - quotaRepairTask.repair(); - + CompletableFuture repair = quotaRepairTask.repair(); + Boolean repairStatus = repair.get(); + assertTrue(repairStatus); + + OMQuotaRepairRequest omQuotaRepairRequest = new OMQuotaRepairRequest(ref.get()); + OMClientResponse omClientResponse = omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1); + BatchOperation batchOperation = omMetadataManager.getStore().initBatchOperation(); + ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager, batchOperation); + omMetadataManager.getStore().commitBatchOperation(batchOperation); // 10 files of each type, obs have replication of three and // fso have replication of one OmBucketInfo obsUpdateBucketInfo = omMetadataManager.getBucketTable().get( @@ -105,6 +135,16 @@ public void testQuotaRepair() throws Exception { @Test public void testQuotaRepairForOldVersionVolumeBucket() throws Exception { + when(ozoneManager.isRatisEnabled()).thenReturn(false); + OzoneManagerProtocolProtos.OMResponse respMock = mock(OzoneManagerProtocolProtos.OMResponse.class); + when(respMock.getSuccess()).thenReturn(true); + OzoneManagerProtocolServerSideTranslatorPB serverMock = mock(OzoneManagerProtocolServerSideTranslatorPB.class); + AtomicReference ref = new AtomicReference<>(); + doAnswer(invocation -> { + ref.set(invocation.getArgument(1, OzoneManagerProtocolProtos.OMRequest.class)); + return respMock; + }).when(serverMock).submitRequest(any(), any()); + when(ozoneManager.getOmServerProtocol()).thenReturn(serverMock); // add volume with -2 value OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder().setCreationTime(Time.now()) @@ -117,13 +157,14 @@ public void testQuotaRepairForOldVersionVolumeBucket() throws Exception { new CacheKey<>(omMetadataManager.getVolumeKey(volumeName)), CacheValue.get(1L, omVolumeArgs)); - // add bucket with -2 value + // add bucket with -2 value and add to db OMRequestTestUtils.addBucketToDB(volumeName, bucketName, omMetadataManager, -2); + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + omMetadataManager.getBucketTable().put(bucketKey, omMetadataManager.getBucketTable().get(bucketKey)); // pre check for quota flag - OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get( - omMetadataManager.getBucketKey(volumeName, bucketName)); + OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(bucketKey); assertEquals(-2, bucketInfo.getQuotaInBytes()); omVolumeArgs = omMetadataManager.getVolumeTable().get( @@ -132,10 +173,17 @@ public void testQuotaRepairForOldVersionVolumeBucket() throws Exception { assertEquals(-2, omVolumeArgs.getQuotaInNamespace()); QuotaRepairTask quotaRepairTask = new QuotaRepairTask(ozoneManager); - quotaRepairTask.repair(); - + CompletableFuture repair = quotaRepairTask.repair(); + Boolean repairStatus = repair.get(); + assertTrue(repairStatus); + + OMQuotaRepairRequest omQuotaRepairRequest = new OMQuotaRepairRequest(ref.get()); + OMClientResponse omClientResponse = omQuotaRepairRequest.validateAndUpdateCache(ozoneManager, 1); + BatchOperation batchOperation = omMetadataManager.getStore().initBatchOperation(); + ((OMQuotaRepairResponse)omClientResponse).addToDBBatch(omMetadataManager, batchOperation); + omMetadataManager.getStore().commitBatchOperation(batchOperation); bucketInfo = omMetadataManager.getBucketTable().get( - omMetadataManager.getBucketKey(volumeName, bucketName)); + bucketKey); assertEquals(-1, bucketInfo.getQuotaInBytes()); OmVolumeArgs volArgsVerify = omMetadataManager.getVolumeTable() .get(omMetadataManager.getVolumeKey(volumeName)); From 4e66ad96cc35a9f448ab13e67a2ddea4e43bdfd3 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 6 Aug 2024 12:37:23 +0530 Subject: [PATCH 3/7] quota repair trigger at leader node --- .../om/upgrade/QuotaRepairUpgradeAction.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java index 80e30406ef66..52b5f3fdbe81 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java @@ -21,7 +21,11 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; +import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; import org.apache.hadoop.ozone.om.service.QuotaRepairTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.QUOTA; import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; @@ -31,6 +35,7 @@ */ @UpgradeActionOm(type = ON_FINALIZE, feature = QUOTA) public class QuotaRepairUpgradeAction implements OmUpgradeAction { + private static final Logger LOG = LoggerFactory.getLogger(QuotaRepairUpgradeAction.class); @Override public void execute(OzoneManager arg) throws Exception { boolean enabled = arg.getConfiguration().getBoolean( @@ -38,8 +43,14 @@ public void execute(OzoneManager arg) throws Exception { OMConfigKeys.OZONE_OM_UPGRADE_QUOTA_RECALCULATE_ENABLE_DEFAULT); if (enabled) { // just trigger quota repair and status can be checked via CLI - QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); - quotaRepairTask.repair(); + try { + arg.checkLeaderStatus(); + QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); + quotaRepairTask.repair(); + } catch (OMNotLeaderException | OMLeaderNotReadyException ex) { + // on leader node, repair will be triggered where finalize is called + LOG.warn("Unable to start quota repair as this is not a leader node"); + } } } } From 067787db1bd156b86f40f50521d6150d1ada5b00 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 6 Aug 2024 19:02:18 +0530 Subject: [PATCH 4/7] volume old quota update with flag --- .../ozone/om/request/volume/OMQuotaRepairRequest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java index e673ecb4bf76..3ce74f5315c2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.tuple.Pair; @@ -74,7 +75,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn OzoneManagerProtocolProtos.BucketQuotaCount bucketCountInfo = quotaRepairRequest.getBucketCount(i); updateBucketInfo(omMetadataManager, bucketCountInfo, transactionLogIndex, bucketMap); } - Map volUpdateMap = updateOldVolumeQuotaSupport(omMetadataManager, transactionLogIndex); + Map volUpdateMap; + if (quotaRepairRequest.getSupportVolumeOldQuota()) { + volUpdateMap = updateOldVolumeQuotaSupport(omMetadataManager, transactionLogIndex); + } else { + volUpdateMap = Collections.emptyMap(); + } omResponse.setQuotaRepairResponse( OzoneManagerProtocolProtos.QuotaRepairResponse.newBuilder().build()); omClientResponse = new OMQuotaRepairResponse(omResponse.build(), volUpdateMap, bucketMap); From 3f4f7a427585f29e8a51f2bea6fcd8a92cf6d768 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 14 Aug 2024 16:34:30 +0530 Subject: [PATCH 5/7] review fix --- .../apache/hadoop/ozone/om/service/QuotaRepairTask.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java index 24e7fb424664..b3e64c98c5dd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java @@ -98,8 +98,8 @@ private boolean repairTask() { LOG.info("Starting quota repair task {}", REPAIR_STATUS); OMMetadataManager activeMetaManager = null; try { - // thread pool with 3 Table type * (1 task each + 3 thread each) - executor = Executors.newFixedThreadPool(12); + // thread pool with 3 Table type * (1 task each + 3 thread for each task) + executor = Executors.newFixedThreadPool(3 * (1 + TASK_THREAD_CNT)); OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder = OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder(); // repair active db @@ -116,9 +116,10 @@ private boolean repairTask() { .setClientId(clientId.toString()) .build(); OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest, clientId); - if (response != null && response.getSuccess()) { + if (response != null && !response.getSuccess()) { LOG.error("update quota repair count response failed"); REPAIR_STATUS.updateStatus("Response for update DB is failed"); + return false; } else { REPAIR_STATUS.updateStatus(builder, om.getMetadataManager()); } From 77a67caf33bd4add9bc4cf6444ed9acd17d61952 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 20 Aug 2024 09:02:25 +0530 Subject: [PATCH 6/7] fix review comment --- .../om/request/volume/OMQuotaRepairRequest.java | 12 ++++++++++++ .../ozone/om/upgrade/QuotaRepairUpgradeAction.java | 5 +++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java index 3ce74f5315c2..e307a1f95fd2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMQuotaRepairRequest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.request.OMClientRequest; @@ -38,6 +39,7 @@ import org.apache.hadoop.ozone.om.response.volume.OMQuotaRepairResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.server.protocol.TermIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +60,16 @@ public OMQuotaRepairRequest(OMRequest omRequest) { super(omRequest); } + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + UserGroupInformation ugi = createUGIForApi(); + if (ozoneManager.getAclsEnabled() && !ozoneManager.isAdmin(ugi)) { + throw new OMException("Access denied for user " + ugi + ". Admin privilege is required for quota repair.", + OMException.ResultCodes.ACCESS_DENIED); + } + return super.preExecute(ozoneManager); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java index 52b5f3fdbe81..4157dd0a7a1a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java @@ -48,8 +48,9 @@ public void execute(OzoneManager arg) throws Exception { QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); quotaRepairTask.repair(); } catch (OMNotLeaderException | OMLeaderNotReadyException ex) { - // on leader node, repair will be triggered where finalize is called - LOG.warn("Unable to start quota repair as this is not a leader node"); + // on leader node, repair will be triggered where finalize is called. For other nodes, it will be ignored. + // This can be triggered on need basis via CLI tool. + LOG.warn("Skip quota repair operation during upgrade on the node as this is not a leader node."); } } } From 2c749fe8b6cd0a8c9267d61d22583a0133f88951 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 20 Aug 2024 13:14:55 +0530 Subject: [PATCH 7/7] test fix --- .../hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java index 4157dd0a7a1a..446c7382d50b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/QuotaRepairUpgradeAction.java @@ -44,7 +44,9 @@ public void execute(OzoneManager arg) throws Exception { if (enabled) { // just trigger quota repair and status can be checked via CLI try { - arg.checkLeaderStatus(); + if (arg.isRatisEnabled()) { + arg.checkLeaderStatus(); + } QuotaRepairTask quotaRepairTask = new QuotaRepairTask(arg); quotaRepairTask.repair(); } catch (OMNotLeaderException | OMLeaderNotReadyException ex) {