Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum OzoneManagerVersion implements ComponentVersion {
OBJECT_TAG(5, "OzoneManager version that supports object tags"),

ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as atomic operation"),
HBASE_SUPPORT(7, "OzoneManager version that supports HBase integration"),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
Expand Down Expand Up @@ -107,6 +108,7 @@ enum StreamAction {
*/
private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;

public KeyOutputStream(ReplicationConfig replicationConfig, BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
this.replication = replicationConfig;
Expand Down Expand Up @@ -157,6 +159,7 @@ public KeyOutputStream(Builder b) {
this.atomicKeyCreation = b.getAtomicKeyCreation();
this.streamBufferArgs = b.getStreamBufferArgs();
this.clientMetrics = b.getClientMetrics();
this.ozoneManagerVersion = b.ozoneManagerVersion;
}

/**
Expand Down Expand Up @@ -457,6 +460,11 @@ public void hsync() throws IOException {
throw new UnsupportedOperationException("The replication factor = "
+ replication.getRequiredNodes() + " <= 1");
}
if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
throw new UnsupportedOperationException("Hsync API requires OM version "
+ OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version "
+ ozoneManagerVersion);
}
checkNotClosed();
final long hsyncPos = writeOffset;

Expand Down Expand Up @@ -599,6 +607,7 @@ public static class Builder {
private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
private OzoneManagerVersion ozoneManagerVersion;

public String getMultipartUploadID() {
return multipartUploadID;
Expand Down Expand Up @@ -721,6 +730,15 @@ public Supplier<ExecutorService> getExecutorServiceSupplier() {
return executorServiceSupplier;
}

public Builder setOmVersion(OzoneManagerVersion omVersion) {
this.ozoneManagerVersion = omVersion;
return this;
}

public OzoneManagerVersion getOmVersion() {
return ozoneManagerVersion;
}

public KeyOutputStream build() {
return new KeyOutputStream(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
Expand Down Expand Up @@ -1313,4 +1315,26 @@ List<OzoneSnapshotDiff> listSnapshotDiffJobs(String volumeName,
* */
void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
throws IOException;

/**
* Start the lease recovery of a file.
*
* @param volumeName - The volume name.
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
* @param force - force recover the file.
* @return LeaseKeyInfo KeyInfo of file under recovery
* @throws IOException if an error occurs
*/
LeaseKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) throws IOException;

/**
* Recovery and commit a key. This will make the change from the client visible. The client
* is identified by the clientID.
*
* @param args the key to commit
* @param clientID the client identification
* @throws IOException
*/
void recoverKey(OmKeyArgs args, long clientID) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
Expand Down Expand Up @@ -336,7 +337,7 @@ public XceiverClientFactory getXceiverClientManager() {
return xceiverClientManager;
}

private OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
public static OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
OzoneManagerVersion version = OzoneManagerVersion.CURRENT;
for (ServiceInfo si : info.getServiceInfoList()) {
if (si.getNodeType() == HddsProtos.NodeType.OM) {
Expand Down Expand Up @@ -2562,7 +2563,8 @@ private KeyOutputStream.Builder createKeyOutputStream(
.setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
.setExecutorServiceSupplier(writeExecutor)
.setStreamBufferArgs(streamBufferArgs);
.setStreamBufferArgs(streamBufferArgs)
.setOmVersion(omVersion);
}

@Override
Expand Down Expand Up @@ -2684,6 +2686,28 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
ozoneManagerClient.setTimes(builder.build(), mtime, atime);
}

@Override
public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
String keyName, boolean force)
throws IOException {
if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
throw new UnsupportedOperationException("Lease recovery API requires OM version "
+ OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version "
+ omVersion);
}
return ozoneManagerClient.recoverLease(volumeName, bucketName, keyName, force);
}

@Override
public void recoverKey(OmKeyArgs args, long clientID) throws IOException {
if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
throw new UnsupportedOperationException("Lease recovery API requires OM version "
+ OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version "
+ omVersion);
}
ozoneManagerClient.recoverKey(args, clientID);
}

private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -703,8 +704,8 @@ public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) thro
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);

try {
return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
volume.getName(), bucket.getName(), pathStr, force);
ClientProtocol clientProtocol = ozoneClient.getProxy();
return clientProtocol.recoverLease(volume.getName(), bucket.getName(), pathStr, force);
} catch (OMException ome) {
if (ome.getResult() == NOT_A_FILE) {
throw new FileNotFoundException("Path is not a file. " + ome.getMessage());
Expand All @@ -720,7 +721,8 @@ public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) thro
public void recoverFile(OmKeyArgs keyArgs) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);

ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
ClientProtocol clientProtocol = ozoneClient.getProxy();
clientProtocol.recoverKey(keyArgs, 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,8 @@ public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) thro

try {
OzoneBucket bucket = getBucket(ofsPath, false);
return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
ClientProtocol clientProtocol = ozoneClient.getProxy();
return clientProtocol.recoverLease(
bucket.getVolumeName(), bucket.getName(), ofsPath.getKeyName(), force);
} catch (OMException ome) {
if (ome.getResult() == NOT_A_FILE) {
Expand All @@ -1414,7 +1415,8 @@ public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) thro
public void recoverFile(OmKeyArgs keyArgs) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);

ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
ClientProtocol clientProtocol = ozoneClient.getProxy();
clientProtocol.recoverKey(keyArgs, 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
Expand Down Expand Up @@ -768,4 +770,15 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
throws IOException {
}

@Override
public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
String keyName, boolean force) throws IOException {
return null;
}

@Override
public void recoverKey(OmKeyArgs args, long clientID) throws IOException {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.server.JsonUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import picocli.CommandLine;

Expand Down Expand Up @@ -112,6 +115,13 @@ public Void call() throws Exception {

OzoneManagerProtocol ozoneManagerClient =
parent.createOmClient(omServiceId, omHost, false);
ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
final OzoneManagerVersion omVersion = RpcClient.getOmVersion(serviceInfoEx);
if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
System.err.println("Error: This command requires OzoneManager version "
+ OzoneManagerVersion.HBASE_SUPPORT.name() + " or later.");
return null;
}

ListOpenFilesResult res =
ozoneManagerClient.listOpenFiles(pathPrefix, limit, startItem);
Expand Down