Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ long computeBufferData() {
return totalDataLen;
}

public long getDataSize() {
return keyArgs.getDataSize();
}

@Override
public Map<String, String> getMetadata() {
return this.keyArgs.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,7 @@ public Map<String, String> getMetadata() {
return null;
}

long getDataSize() {
return keyArgs.getDataSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

private enum StripeWriteStatus {
SUCCESS,
FAILED
Expand Down Expand Up @@ -155,6 +163,7 @@ private ECKeyOutputStream(Builder builder) {
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}

/**
Expand Down Expand Up @@ -512,6 +521,12 @@ public void close() throws IOException {
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset, String.format(
"Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
blockOutputStreamEntryPool.commitKey(offset);
}
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ enum StreamAction {

private long clientID;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

@VisibleForTesting
public List<BlockDataStreamOutputEntry> getStreamEntries() {
return blockDataStreamOutputEntryPool.getStreamEntries();
Expand Down Expand Up @@ -109,7 +117,8 @@ public KeyDataStreamOutput(
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion
boolean unsafeByteBufferConversion,
boolean atomicKeyCreation
) {
super(HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval()));
Expand All @@ -130,6 +139,7 @@ public KeyDataStreamOutput(
// encrypted bucket.
this.writeOffset = 0;
this.clientID = handler.getId();
this.atomicKeyCreation = atomicKeyCreation;
}

/**
Expand Down Expand Up @@ -387,6 +397,12 @@ public void close() throws IOException {
if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
Preconditions.checkArgument(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
blockDataStreamOutputEntryPool.commitKey(offset);
} finally {
blockDataStreamOutputEntryPool.cleanup();
Expand Down Expand Up @@ -422,6 +438,7 @@ public static class Builder {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private boolean atomicKeyCreation = false;

public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
Expand Down Expand Up @@ -474,6 +491,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public KeyDataStreamOutput build() {
return new KeyDataStreamOutput(
clientConfig,
Expand All @@ -486,7 +508,8 @@ public KeyDataStreamOutput build() {
multipartUploadID,
multipartNumber,
isMultipartKey,
unsafeByteBufferConversion);
unsafeByteBufferConversion,
atomicKeyCreation);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ enum StreamAction {

private long clientID;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

public KeyOutputStream(ReplicationConfig replicationConfig,
ContainerClientMetrics clientMetrics) {
this.replication = replicationConfig;
Expand Down Expand Up @@ -142,7 +150,8 @@ public KeyOutputStream(
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
ContainerClientMetrics clientMetrics
ContainerClientMetrics clientMetrics,
boolean atomicKeyCreation
) {
this.config = config;
this.replication = replicationConfig;
Expand All @@ -163,6 +172,7 @@ public KeyOutputStream(
this.isException = false;
this.writeOffset = 0;
this.clientID = handler.getId();
this.atomicKeyCreation = atomicKeyCreation;
}

/**
Expand Down Expand Up @@ -555,6 +565,12 @@ public synchronized void close() throws IOException {
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
Expand Down Expand Up @@ -591,6 +607,7 @@ public static class Builder {
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
private boolean atomicKeyCreation = false;

public String getMultipartUploadID() {
return multipartUploadID;
Expand Down Expand Up @@ -677,6 +694,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
Expand All @@ -686,6 +708,10 @@ public ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}

public boolean getAtomicKeyCreation() {
return atomicKeyCreation;
}

public KeyOutputStream build() {
return new KeyOutputStream(
clientConfig,
Expand All @@ -698,7 +724,8 @@ public KeyOutputStream build() {
multipartNumber,
isMultipartKey,
unsafeByteBufferConversion,
clientMetrics);
clientMetrics,
atomicKeyCreation);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,35 @@ public synchronized void close() throws IOException {
}

public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput();
if (keyDataStreamOutput != null) {
return keyDataStreamOutput.getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
}

public KeyDataStreamOutput getKeyDataStreamOutput() {
if (byteBufferStreamOutput instanceof OzoneOutputStream) {
OutputStream outputStream =
((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
if (outputStream instanceof KeyDataStreamOutput) {
return ((KeyDataStreamOutput)
outputStream).getCommitUploadPartInfo();
return ((KeyDataStreamOutput) outputStream);
} else if (outputStream instanceof CryptoOutputStream) {
OutputStream wrappedStream =
((CryptoOutputStream) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyDataStreamOutput) {
return ((KeyDataStreamOutput) wrappedStream)
.getCommitUploadPartInfo();
return ((KeyDataStreamOutput) wrappedStream);
}
} else if (outputStream instanceof CipherOutputStreamOzone) {
OutputStream wrappedStream =
((CipherOutputStreamOzone) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyDataStreamOutput) {
return ((KeyDataStreamOutput) wrappedStream)
.getCommitUploadPartInfo();
return ((KeyDataStreamOutput) wrappedStream);
}
}
} else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
return ((KeyDataStreamOutput)
byteBufferStreamOutput).getCommitUploadPartInfo();
return ((KeyDataStreamOutput) byteBufferStreamOutput);
}
// Otherwise return null.
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,38 @@ public void hsync() throws IOException {
}

public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
KeyOutputStream keyOutputStream = getKeyOutputStream();
if (keyOutputStream != null) {
return keyOutputStream.getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
}

public OutputStream getOutputStream() {
return outputStream;
}

public KeyOutputStream getKeyOutputStream() {
if (outputStream instanceof KeyOutputStream) {
return ((KeyOutputStream) outputStream).getCommitUploadPartInfo();
return ((KeyOutputStream) outputStream);
} else if (outputStream instanceof CryptoOutputStream) {
OutputStream wrappedStream =
((CryptoOutputStream) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyOutputStream) {
return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo();
return ((KeyOutputStream) wrappedStream);
}
} else if (outputStream instanceof CipherOutputStreamOzone) {
OutputStream wrappedStream =
((CipherOutputStreamOzone) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyOutputStream) {
return ((KeyOutputStream)wrappedStream).getCommitUploadPartInfo();
return ((KeyOutputStream)wrappedStream);
}
}
// Otherwise return null.
return null;
}

public OutputStream getOutputStream() {
return outputStream;
}

@Override
public Map<String, String> getMetadata() {
if (outputStream instanceof CryptoOutputStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,9 @@ OzoneKey headObject(String volumeName, String bucketName,
*/
void setThreadLocalS3Auth(S3Auth s3Auth);


void setIsS3Request(boolean isS3Request);

/**
* Gets the S3 Authentication information that is attached to the thread.
* @return S3 Authentication information.
Expand Down
Loading