Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -233,7 +234,7 @@ public void setUp() throws IOException {
ozoneManager.getMetadataManager().getMetaTable().put(
OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v));
return null;
}).when(omRatisServer).submitRequest(any(), any());
}).when(omRatisServer).submitRequest(any(), any(), anyLong());
} catch (ServiceException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,7 @@ private void addOMNodeToPeers(String newOMNodeId) throws IOException {
} catch (IOException e) {
LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(),
newOMNodeId);
return;
}

if (omRatisSnapshotProvider == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.om;

import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand All @@ -35,15 +34,12 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,8 +65,6 @@
*/
public class TrashOzoneFileSystem extends FileSystem {

private static final RpcController NULL_RPC_CONTROLLER = null;

private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100;

private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000;
Expand All @@ -97,34 +91,15 @@ public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException {
ozoneConfiguration = OzoneConfiguration.of(getConf());
}

private RaftClientRequest getRatisRequest(
OzoneManagerProtocolProtos.OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(CLIENT_ID)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.getAndIncrement())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

}

private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest)
throws Exception {
ozoneManager.getMetrics().incNumTrashWriteRequests();
if (ozoneManager.isRatisEnabled()) {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
// perform preExecute as ratis submit do no perform preExecute
OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
omRequest = omClientRequest.preExecute(ozoneManager);
RaftClientRequest req = getRatisRequest(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest, req);
} else {
ozoneManager.getOmServerProtocol().
submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.getAndIncrement());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,23 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) {
}

/**
* API used internally from OzoneManager Server when requests needs to be
* submitted to ratis, where the crafted RaftClientRequest is passed along.
* API used internally from OzoneManager Server when requests need to be submitted.
* @param omRequest
* @param raftClientRequest
* @param cliId
* @param callId
* @return OMResponse
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest,
RaftClientRequest raftClientRequest) throws ServiceException {
public OMResponse submitRequest(OMRequest omRequest, ClientId cliId, long callId) throws ServiceException {
RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(cliId)
.setServerId(getRaftPeerId())
.setGroupId(getRaftGroupId())
.setCallId(callId)
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
RaftClientReply raftClientReply =
submitRequestToRatis(raftClientRequest);
return createOmResponse(omRequest, raftClientReply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.nio.file.InvalidPathException;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.ClientId;
import org.rocksdb.RocksDBException;

import java.io.IOException;
Expand All @@ -117,6 +119,7 @@
public final class OzoneManagerRatisUtils {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerRatisUtils.class);
private static final RpcController NULL_RPC_CONTROLLER = null;

private OzoneManagerRatisUtils() {
}
Expand Down Expand Up @@ -502,4 +505,13 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,

return null;
}

public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
if (om.isRatisEnabled()) {
return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
} else {
return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
Expand All @@ -48,8 +48,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.Preconditions;

import java.io.IOException;
Expand Down Expand Up @@ -247,10 +245,7 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,

// Submit PurgeKeys request to OM
try {
RaftClientRequest raftClientRequest =
createRaftClientRequestForPurge(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.");
return 0;
Expand All @@ -259,20 +254,6 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
return deletedCount;
}

protected RaftClientRequest createRaftClientRequestForPurge(
OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.get())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

/**
* Parse Volume and Bucket Name from ObjectKey and add it to given map of
* keys to be purged per bucket.
Expand Down Expand Up @@ -311,15 +292,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,

// Submit Purge paths request to OM
try {
if (isRatisEnabled()) {
RaftClientRequest raftClientRequest =
createRaftClientRequestForPurge(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
} else {
getOzoneManager().getOmServerProtocol()
.submitRequest(null, omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
Expand All @@ -67,8 +66,6 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -481,24 +478,7 @@ private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) {

public void submitRequest(OMRequest omRequest, ClientId clientId) {
try {
if (isRatisEnabled()) {
OzoneManagerRatisServer server = getOzoneManager().getOmRatisServer();

RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(server.getRaftPeerId())
.setGroupId(server.getRaftGroupId())
.setCallId(getRunCount().get())
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

server.submitRequest(omRequest, raftClientRequest);
} else {
getOzoneManager().getOmServerProtocol()
.submitRequest(null, omRequest);
}
OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
} catch (ServiceException e) {
LOG.error("Snapshot deep cleaning request failed. " +
"Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -208,24 +205,7 @@ private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>

private void submitRequest(OMRequest omRequest) {
try {
if (isRatisEnabled()) {
OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();

RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(server.getRaftPeerId())
.setGroupId(server.getRaftGroupId())
.setCallId(runCount.get())
.setMessage(Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();

server.submitRequest(omRequest, raftClientRequest);
} else {
ozoneManager.getOmServerProtocol().submitRequest(null,
omRequest);
}
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("Expired multipart info delete request failed. " +
"Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
import org.apache.hadoop.ozone.om.multitenant.InMemoryMultiTenantAccessController;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -375,19 +373,6 @@ long getRangerOzoneServicePolicyVersion() throws IOException {
return policyVersion;
}

private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(CLIENT_ID)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.get())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

public void setOMDBRangerServiceVersion(long version)
throws ServiceException {
// OM DB update goes through Ratis
Expand All @@ -402,9 +387,7 @@ public void setOMDBRangerServiceVersion(long version)
.build();

try {
RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.get());
} catch (ServiceException e) {
LOG.error("SetRangerServiceVersion request failed. "
+ "Will retry at next run.");
Expand Down
Loading