Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
Expand Down Expand Up @@ -277,9 +276,7 @@ public static LengthInputStream getFromOmKeyInfo(
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = omKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.getContainerWithPipeline(containerID);
Pipeline pipeline = containerWithPipeline.getPipeline();
Pipeline pipeline = omKeyLocationInfo.getPipeline();

// irrespective of the container state, we will always read via Standalone
// protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
.ChecksumType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand Down Expand Up @@ -70,7 +68,6 @@ public class KeyOutputStream extends OutputStream {
private final ArrayList<BlockOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final StorageContainerLocationProtocol scmClient;
private final OmKeyArgs keyArgs;
private final long openID;
private final XceiverClientManager xceiverClientManager;
Expand All @@ -95,7 +92,6 @@ public class KeyOutputStream extends OutputStream {
public KeyOutputStream() {
streamEntries = new ArrayList<>();
omClient = null;
scmClient = null;
keyArgs = null;
openID = -1;
xceiverClientManager = null;
Expand Down Expand Up @@ -129,6 +125,7 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
.setLength(streamEntry.getCurrentPosition()).setOffset(0)
.setToken(streamEntry.getToken())
.setPipeline(streamEntry.getPipeline())
.build();
LOG.debug("block written " + streamEntry.getBlockID() + ", length "
+ streamEntry.getCurrentPosition() + " bcsID "
Expand All @@ -142,7 +139,6 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocol scmClient,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
Expand All @@ -151,7 +147,6 @@ public KeyOutputStream(OpenKeySession handler,
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.omClient = omClient;
this.scmClient = scmClient;
OmKeyInfo info = handler.getKeyInfo();
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
Expand Down Expand Up @@ -212,15 +207,14 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,

private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
throws IOException {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerWithPipeline(subKeyInfo.getContainerID());
Preconditions.checkNotNull(subKeyInfo.getPipeline());
UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
BlockOutputStreamEntry.Builder builder =
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientManager)
.setPipeline(containerWithPipeline.getPipeline())
.setPipeline(subKeyInfo.getPipeline())
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
Expand Down Expand Up @@ -601,7 +595,6 @@ public ExcludeList getExcludeList() {
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
private StorageContainerLocationProtocol scmClient;
private OzoneManagerProtocol omClient;
private int chunkSize;
private String requestID;
Expand Down Expand Up @@ -638,11 +631,6 @@ public Builder setXceiverClientManager(XceiverClientManager manager) {
return this;
}

public Builder setScmClient(StorageContainerLocationProtocol client) {
this.scmClient = client;
return this;
}

public Builder setOmClient(
OzoneManagerProtocol client) {
this.omClient = client;
Expand Down Expand Up @@ -705,7 +693,7 @@ public Builder setIsMultipartKey(boolean isMultipart) {
}

public KeyOutputStream build() throws IOException {
return new KeyOutputStream(openHandler, xceiverManager, scmClient,
return new KeyOutputStream(openHandler, xceiverManager,
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ public OzoneOutputStream createKey(
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
Expand Down Expand Up @@ -860,7 +859,6 @@ public OzoneOutputStream createMultipartKey(String volumeName,
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public final class OmKeyArgs implements Auditable {
private final String multipartUploadID;
private final int multipartUploadPartNumber;
private Map<String, String> metadata;
private boolean refreshPipeline;

@SuppressWarnings("parameternumber")
private OmKeyArgs(String volumeName, String bucketName, String keyName,
long dataSize, ReplicationType type, ReplicationFactor factor,
List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
String uploadID, int partNumber,
Map<String, String> metadataMap) {
Map<String, String> metadataMap, boolean refreshPipeline) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
Expand All @@ -62,6 +63,7 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName,
this.multipartUploadID = uploadID;
this.multipartUploadPartNumber = partNumber;
this.metadata = metadataMap;
this.refreshPipeline = refreshPipeline;
}

public boolean getIsMultipartKey() {
Expand Down Expand Up @@ -120,6 +122,10 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
return locationInfoList;
}

public boolean getRefreshPipeline() {
return refreshPipeline;
}

@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -159,6 +165,7 @@ public static class Builder {
private String multipartUploadID;
private int multipartUploadPartNumber;
private Map<String, String> metadata = new HashMap<>();
private boolean refreshPipeline;

public Builder setVolumeName(String volume) {
this.volumeName = volume;
Expand Down Expand Up @@ -220,10 +227,15 @@ public Builder addAllMetadata(Map<String, String> metadatamap) {
return this;
}

public Builder setRefreshPipeline(boolean refresh) {
this.refreshPipeline = refresh;
return this;
}

public OmKeyArgs build() {
return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
factor, locationInfoList, isMultipartKey, multipartUploadID,
multipartUploadPartNumber, metadata);
multipartUploadPartNumber, metadata, refreshPipeline);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om.helpers;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.security.token.Token;
Expand All @@ -35,15 +37,20 @@ public final class OmKeyLocationInfo {
// the version number indicating when this block was added
private long createVersion;

private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
private Pipeline pipeline;

private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
long offset) {
this.blockID = blockID;
this.pipeline = pipeline;
this.length = length;
this.offset = offset;
}

private OmKeyLocationInfo(BlockID blockID, long length, long offset,
Token<OzoneBlockTokenIdentifier> token) {
private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
long offset, Token<OzoneBlockTokenIdentifier> token) {
this.blockID = blockID;
this.pipeline = pipeline;
this.length = length;
this.offset = offset;
this.token = token;
Expand All @@ -69,6 +76,10 @@ public long getLocalID() {
return blockID.getLocalID();
}

public Pipeline getPipeline() {
return pipeline;
}

public long getLength() {
return length;
}
Expand All @@ -92,6 +103,11 @@ public Token<OzoneBlockTokenIdentifier> getToken() {
public void setToken(Token<OzoneBlockTokenIdentifier> token) {
this.token = token;
}

public void setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
}

/**
* Builder of OmKeyLocationInfo.
*/
Expand All @@ -100,12 +116,18 @@ public static class Builder {
private long length;
private long offset;
private Token<OzoneBlockTokenIdentifier> token;
private Pipeline pipeline;

public Builder setBlockID(BlockID blockId) {
this.blockID = blockId;
return this;
}

public Builder setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
return this;
}

public Builder setLength(long len) {
this.length = len;
return this;
Expand All @@ -123,9 +145,9 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {

public OmKeyLocationInfo build() {
if (token == null) {
return new OmKeyLocationInfo(blockID, length, offset);
return new OmKeyLocationInfo(blockID, pipeline, length, offset);
} else {
return new OmKeyLocationInfo(blockID, length, offset, token);
return new OmKeyLocationInfo(blockID, pipeline, length, offset, token);
}
}
}
Expand All @@ -139,12 +161,27 @@ public KeyLocation getProtobuf() {
if (this.token != null) {
builder.setToken(this.token.toTokenProto());
}
try {
builder.setPipeline(pipeline.getProtobufMessage());
} catch (UnknownPipelineStateException e) {
//TODO: fix me: we should not return KeyLocation without pipeline.
}
return builder.build();
}

private static Pipeline getPipeline(KeyLocation keyLocation) {
try {
return keyLocation.hasPipeline() ?
Pipeline.getFromProtobuf(keyLocation.getPipeline()) : null;
} catch (UnknownPipelineStateException e) {
return null;
}
}

public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
OmKeyLocationInfo info = new OmKeyLocationInfo(
BlockID.getFromProtobuf(keyLocation.getBlockID()),
getPipeline(keyLocation),
keyLocation.getLength(),
keyLocation.getOffset());
if(keyLocation.hasToken()) {
Expand All @@ -161,6 +198,7 @@ public String toString() {
", length=" + length +
", offset=" + offset +
", token=" + token +
", pipeline=" + pipeline +
", createVersion=" + createVersion + '}';
}
}
7 changes: 7 additions & 0 deletions hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,13 @@ message KeyLocation {
// indicated at which version this block gets created.
optional uint64 createVersion = 5;
optional hadoop.common.TokenProto token = 6;
// Walk around to include pipeline info for client read/write
// without talking to scm.
// NOTE: the pipeline info may change after pipeline close.
// So eventually, we will have to change back to call scm to
// get the up to date pipeline information. This will need o3fs
// provide not only a OM delegation token but also a SCM delegation token
optional hadoop.hdds.Pipeline pipeline = 7;
}

message KeyLocationList {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(args.getRequestID())
Expand Down
Loading