Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public static boolean isReadOnly(
case PrintCompactionLogDag:
case GetSnapshotInfo:
case GetServerDefaults:
case GetQuotaRepairStatus:
case StartQuotaRepair:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This StartQuotaRepair is a write operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It trigger a repair activity, but not directly a write operation to ratis --> and as part of this, it further trigger a write operation to ratis based on repair to be done.
Since can not be submitted to Ratis as no direct write action, so path of Read is followed.
This is similar to "RangerBGSync", "SnapshotDiff" where trigger happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation.

return true;
case CreateVolume:
case SetVolumeProperty:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,4 +1153,17 @@ boolean setSafeMode(SafeModeAction action, boolean isChecked)
* @throws IOException
*/
OzoneFsServerDefaults getServerDefaults() throws IOException;

/**
* Get status of last triggered quota repair in OM.
* @return String
* @throws IOException
*/
String getQuotaRepairStatus() throws IOException;

/**
* start quota repair in OM.
* @throws IOException
*/
void startQuotaRepair(List<String> buckets) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2579,6 +2579,30 @@ public OzoneFsServerDefaults getServerDefaults()
serverDefaultsResponse.getServerDefaults());
}

@Override
public String getQuotaRepairStatus() throws IOException {
OzoneManagerProtocolProtos.GetQuotaRepairStatusRequest quotaRepairStatusRequest =
OzoneManagerProtocolProtos.GetQuotaRepairStatusRequest.newBuilder()
.build();

OMRequest omRequest = createOMRequest(Type.GetQuotaRepairStatus)
.setGetQuotaRepairStatusRequest(quotaRepairStatusRequest).build();

OzoneManagerProtocolProtos.GetQuotaRepairStatusResponse quotaRepairStatusResponse
= handleError(submitRequest(omRequest)).getGetQuotaRepairStatusResponse();
return quotaRepairStatusResponse.getStatus();
}

@Override
public void startQuotaRepair(List<String> buckets) throws IOException {
OzoneManagerProtocolProtos.StartQuotaRepairRequest startQuotaRepairRequest =
OzoneManagerProtocolProtos.StartQuotaRepairRequest.newBuilder()
.build();
OMRequest omRequest = createOMRequest(Type.StartQuotaRepair)
.setStartQuotaRepairRequest(startQuotaRepairRequest).build();
handleError(submitRequest(omRequest));
}

private SafeMode toProtoBuf(SafeModeAction action) {
switch (action) {
case ENTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import org.apache.hadoop.ozone.debug.DBScanner;
import org.apache.hadoop.ozone.debug.RDBParser;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.repair.OzoneRepair;
import org.apache.hadoop.ozone.repair.RDBRepair;
import org.apache.hadoop.ozone.repair.TransactionInfoRepair;
import org.apache.hadoop.ozone.repair.quota.QuotaRepair;
import org.apache.hadoop.ozone.repair.quota.QuotaStatus;
import org.apache.hadoop.ozone.repair.quota.QuotaTrigger;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,6 +43,7 @@
import java.util.regex.Pattern;

import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -130,4 +136,28 @@ private String[] parseScanOutput(String output) throws IOException {
throw new IllegalStateException("Failed to scan and find raft's highest term and index from TransactionInfo table");
}

@Test
public void testQuotaRepair() throws Exception {
CommandLine cmd = new CommandLine(new OzoneRepair()).addSubcommand(new CommandLine(new QuotaRepair())
.addSubcommand(new QuotaStatus()).addSubcommand(new QuotaTrigger()));

String[] args = new String[] {"quota", "status", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
int exitCode = cmd.execute(args);
assertEquals(0, exitCode);
args = new String[] {"quota", "start", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
exitCode = cmd.execute(args);
assertEquals(0, exitCode);
GenericTestUtils.waitFor(() -> {
out.reset();
// verify quota trigger is completed having non-zero lastRunFinishedTime
String[] targs = new String[]{"quota", "status", "--service-host", conf.get(OZONE_OM_ADDRESS_KEY)};
cmd.execute(targs);
try {
return !out.toString(DEFAULT_ENCODING).contains("\"lastRunFinishedTime\":\"\"");
} catch (Exception ex) {
// do nothing
}
return false;
}, 1000, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ enum Type {
ListOpenFiles = 132;
QuotaRepair = 133;
GetServerDefaults = 134;
GetQuotaRepairStatus = 135;
StartQuotaRepair = 136;
}

enum SafeMode {
Expand Down Expand Up @@ -291,6 +293,8 @@ message OMRequest {
optional ListOpenFilesRequest ListOpenFilesRequest = 130;
optional QuotaRepairRequest QuotaRepairRequest = 131;
optional ServerDefaultsRequest ServerDefaultsRequest = 132;
optional GetQuotaRepairStatusRequest GetQuotaRepairStatusRequest = 133;
optional StartQuotaRepairRequest StartQuotaRepairRequest = 134;
}

message OMResponse {
Expand Down Expand Up @@ -419,6 +423,8 @@ message OMResponse {
optional ListOpenFilesResponse ListOpenFilesResponse = 133;
optional QuotaRepairResponse QuotaRepairResponse = 134;
optional ServerDefaultsResponse ServerDefaultsResponse = 135;
optional GetQuotaRepairStatusResponse GetQuotaRepairStatusResponse = 136;
optional StartQuotaRepairResponse StartQuotaRepairResponse = 137;
}

enum Status {
Expand Down Expand Up @@ -2226,6 +2232,17 @@ message ServerDefaultsResponse {
required FsServerDefaultsProto serverDefaults = 1;
}

message GetQuotaRepairStatusRequest {
}
message GetQuotaRepairStatusResponse {
optional string status = 1;
}
message StartQuotaRepairRequest {
repeated string buckets = 1;
}
message StartQuotaRepairResponse {
}

message OMLockDetailsProto {
optional bool isLockAcquired = 1;
optional uint64 waitLockNanos = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider;
import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
Expand Down Expand Up @@ -4751,6 +4752,18 @@ public OzoneFsServerDefaults getServerDefaults() {
return serverDefaults;
}

@Override
public String getQuotaRepairStatus() throws IOException {
checkAdminUserPrivilege("quota repair status");
return QuotaRepairTask.getStatus();
}

@Override
public void startQuotaRepair(List<String> buckets) throws IOException {
checkAdminUserPrivilege("start quota repair");
new QuotaRepairTask(this).repair(buckets);
}

/**
* Write down Layout version of a finalized feature to DB on finalization.
* @param lvm OMLayoutVersionManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +49,7 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand All @@ -74,27 +76,31 @@ public class QuotaRepairTask {
private static final int TASK_THREAD_CNT = 3;
private static final AtomicBoolean IN_PROGRESS = new AtomicBoolean(false);
private static final RepairStatus REPAIR_STATUS = new RepairStatus();
private static final AtomicLong RUN_CNT = new AtomicLong(0);
private final OzoneManager om;
private final AtomicLong runCount = new AtomicLong(0);
private ExecutorService executor;
public QuotaRepairTask(OzoneManager ozoneManager) {
this.om = ozoneManager;
}

public CompletableFuture<Boolean> repair() throws Exception {
public CompletableFuture<Boolean> repair() throws IOException {
return repair(Collections.emptyList());
}

public CompletableFuture<Boolean> repair(List<String> buckets) throws IOException {
// lock in progress operation and reject any other
if (!IN_PROGRESS.compareAndSet(false, true)) {
LOG.info("quota repair task already running");
return CompletableFuture.supplyAsync(() -> false);
throw new OMException("Quota repair is already running", OMException.ResultCodes.QUOTA_ERROR);
}
REPAIR_STATUS.reset(runCount.get() + 1);
return CompletableFuture.supplyAsync(() -> repairTask());
REPAIR_STATUS.reset(RUN_CNT.get() + 1);
return CompletableFuture.supplyAsync(() -> repairTask(buckets));
}

public static String getStatus() {
return REPAIR_STATUS.toString();
}
private boolean repairTask() {
private boolean repairTask(List<String> buckets) {
LOG.info("Starting quota repair task {}", REPAIR_STATUS);
OMMetadataManager activeMetaManager = null;
try {
Expand All @@ -104,7 +110,7 @@ private boolean repairTask() {
= OzoneManagerProtocolProtos.QuotaRepairRequest.newBuilder();
// repair active db
activeMetaManager = createActiveDBCheckpoint(om.getMetadataManager(), om.getConfiguration());
repairActiveDb(activeMetaManager, builder);
repairActiveDb(activeMetaManager, builder, buckets);

// TODO: repair snapshots for quota

Expand All @@ -116,12 +122,12 @@ private boolean repairTask() {
.setClientId(clientId.toString())
.build();
OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest, clientId);
if (response != null && !response.getSuccess()) {
if (response != null && response.getSuccess()) {
REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
} else {
LOG.error("update quota repair count response failed");
REPAIR_STATUS.updateStatus("Response for update DB is failed");
return false;
} else {
REPAIR_STATUS.updateStatus(builder, om.getMetadataManager());
}
} catch (Exception exp) {
LOG.error("quota repair count failed", exp);
Expand All @@ -145,11 +151,15 @@ private boolean repairTask() {

private void repairActiveDb(
OMMetadataManager metadataManager,
OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder) throws Exception {
OzoneManagerProtocolProtos.QuotaRepairRequest.Builder builder,
List<String> buckets) throws Exception {
Map<String, OmBucketInfo> nameBucketInfoMap = new HashMap<>();
Map<String, OmBucketInfo> idBucketInfoMap = new HashMap<>();
Map<String, OmBucketInfo> oriBucketInfoMap = new HashMap<>();
prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager);
prepareAllBucketInfo(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, buckets);
if (nameBucketInfoMap.isEmpty()) {
throw new OMException("no matching buckets", OMException.ResultCodes.BUCKET_NOT_FOUND);
}

repairCount(nameBucketInfoMap, idBucketInfoMap, metadataManager);

Expand All @@ -174,31 +184,36 @@ private void repairActiveDb(
}

// update volume to support quota
builder.setSupportVolumeOldQuota(true);
if (buckets.isEmpty()) {
builder.setSupportVolumeOldQuota(true);
} else {
builder.setSupportVolumeOldQuota(false);
}
}

private OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) {
OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) throws Exception {
try {
if (om.isRatisEnabled()) {
OzoneManagerRatisServer server = om.getOmRatisServer();
RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(om.getOmRatisServer().getRaftPeerId())
.setGroupId(om.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.getAndIncrement())
.setCallId(RUN_CNT.getAndIncrement())
.setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
return server.submitRequest(omRequest, raftClientRequest);
} else {
RUN_CNT.getAndIncrement();
return om.getOmServerProtocol().submitRequest(
null, omRequest);
}
} catch (ServiceException e) {
LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e);
throw e;
}
return null;
}

private OMMetadataManager createActiveDBCheckpoint(
Expand Down Expand Up @@ -228,24 +243,42 @@ private static String cleanTempCheckPointPath(OMMetadataManager omMetaManager) t

private void prepareAllBucketInfo(
Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> idBucketInfoMap,
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager) throws IOException {
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager,
List<String> buckets) throws IOException {
if (!buckets.isEmpty()) {
for (String bucketkey : buckets) {
OmBucketInfo bucketInfo = metadataManager.getBucketTable().get(bucketkey);
if (null == bucketInfo) {
continue;
}
populateBucket(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, bucketInfo);
}
return;
}
try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>>
iterator = metadataManager.getBucketTable().iterator()) {
while (iterator.hasNext()) {
Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
OmBucketInfo bucketInfo = entry.getValue();
String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
nameBucketInfoMap.put(bucketNameKey, bucketInfo);
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
bucketInfo.getObjectID()), bucketInfo);
populateBucket(nameBucketInfoMap, idBucketInfoMap, oriBucketInfoMap, metadataManager, bucketInfo);
}
}
}

private static void populateBucket(
Map<String, OmBucketInfo> nameBucketInfoMap, Map<String, OmBucketInfo> idBucketInfoMap,
Map<String, OmBucketInfo> oriBucketInfoMap, OMMetadataManager metadataManager,
OmBucketInfo bucketInfo) throws IOException {
String bucketNameKey = buildNamePath(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
oriBucketInfoMap.put(bucketNameKey, bucketInfo.copyObject());
bucketInfo.incrUsedNamespace(-bucketInfo.getUsedNamespace());
bucketInfo.incrUsedBytes(-bucketInfo.getUsedBytes());
nameBucketInfoMap.put(bucketNameKey, bucketInfo);
idBucketInfoMap.put(buildIdPath(metadataManager.getVolumeId(bucketInfo.getVolumeName()),
bucketInfo.getObjectID()), bucketInfo);
}

private boolean isChange(OmBucketInfo lBucketInfo, OmBucketInfo rBucketInfo) {
if (lBucketInfo.getUsedNamespace() != rBucketInfo.getUsedNamespace()
|| lBucketInfo.getUsedBytes() != rBucketInfo.getUsedBytes()) {
Expand Down Expand Up @@ -468,8 +501,9 @@ public String toString() {
}
Map<String, Object> status = new HashMap<>();
status.put("taskId", taskId);
status.put("lastRunStartTime", lastRunStartTime);
status.put("lastRunFinishedTime", lastRunFinishedTime);
status.put("lastRunStartTime", lastRunStartTime > 0 ? new java.util.Date(lastRunStartTime).toString() : "");
status.put("lastRunFinishedTime", lastRunFinishedTime > 0 ? new java.util.Date(lastRunFinishedTime).toString()
: "");
status.put("errorMsg", errorMsg);
status.put("bucketCountDiffMap", bucketCountDiffMap);
try {
Expand Down
Loading