diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index a837feafbf29..c8bf7db7fe5d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -22,7 +22,9 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -96,6 +98,47 @@ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, return new LengthInputStream(keyInputStream, keyInputStream.length); } + public static List getStreamsFromKeyInfo(OmKeyInfo keyInfo, + XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, + Function retryFunction) { + List keyLocationInfos = keyInfo + .getLatestVersionLocations().getBlocksLatestVersionOnly(); + + List lengthInputStreams = new ArrayList<>(); + + // Iterate through each block info in keyLocationInfos and assign it the + // corresponding part in the partsToBlockMap. Also increment each part's + // length accordingly. + Map> partsToBlocksMap = new HashMap<>(); + Map partsLengthMap = new HashMap<>(); + + for (OmKeyLocationInfo omKeyLocationInfo: keyLocationInfos) { + int partNumber = omKeyLocationInfo.getPartNumber(); + + if (!partsToBlocksMap.containsKey(partNumber)) { + partsToBlocksMap.put(partNumber, new ArrayList<>()); + partsLengthMap.put(partNumber, 0L); + } + // Add Block to corresponding partNumber in partsToBlocksMap + partsToBlocksMap.get(partNumber).add(omKeyLocationInfo); + // Update the part length + partsLengthMap.put(partNumber, + partsLengthMap.get(partNumber) + omKeyLocationInfo.getLength()); + } + + // Create a KeyInputStream for each part. + for (Map.Entry> entry : + partsToBlocksMap.entrySet()) { + KeyInputStream keyInputStream = new KeyInputStream(); + keyInputStream.initialize(keyInfo, entry.getValue(), + xceiverClientFactory, verifyChecksum, retryFunction); + lengthInputStreams.add(new LengthInputStream(keyInputStream, + partsLengthMap.get(entry.getKey()))); + } + + return lengthInputStreams; + } + private synchronized void initialize(OmKeyInfo keyInfo, List blockInfos, XceiverClientFactory xceiverClientFactory, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java new file mode 100644 index 000000000000..07c046425482 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class MultipartCryptoKeyInputStream extends OzoneInputStream + implements Seekable, CanUnbuffer { + + private static final Logger LOG = + LoggerFactory.getLogger(MultipartCryptoKeyInputStream.class); + + private static final int EOF = -1; + + private String key; + private long length = 0L; + private boolean closed = false; + + // List of OzoneCryptoInputStream, one for each part of the key + private List partStreams; + + // partOffsets[i] stores the index of the first data byte in + // partStream w.r.t the whole key data. + // For example, let’s say the part size is 200 bytes and part[0] stores + // data from indices 0 - 199, part[1] from indices 200 - 399 and so on. + // Then, partOffsets[0] = 0 (the offset of the first byte of data in + // part[0]), partOffsets[1] = 200 and so on. + private long[] partOffsets; + + // Index of the partStream corresponding to the current position of the + // MultipartCryptoKeyInputStream. + private int partIndex = 0; + + // Tracks the partIndex corresponding to the last seeked position so that it + // can be reset if a new position is seeked. + private int prevPartIndex = 0; + + // If a read's start/ length position doesn't coincide with a Crypto buffer + // boundary, it will be adjusted as reads should happen only at the buffer + // boundaries for decryption to happen correctly. In this case, after the + // data has been read and decrypted, only the requested data should be + // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy + // store these adjustment information. Before returning to client, the first + // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy + // number of bytes must be discarded. + private int readPositionAdjustedBy = 0; + private int readLengthAdjustedBy = 0; + + public MultipartCryptoKeyInputStream(String keyName, + List inputStreams) { + + Preconditions.checkNotNull(inputStreams); + + this.key = keyName; + this.partStreams = inputStreams; + + // Calculate and update the partOffsets + this.partOffsets = new long[inputStreams.size()]; + int i = 0; + for (OzoneCryptoInputStream ozoneCryptoInputStream : inputStreams) { + this.partOffsets[i++] = length; + length += ozoneCryptoInputStream.getLength(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int read() throws IOException { + byte[] buf = new byte[1]; + if (read(buf, 0, 1) == EOF) { + return EOF; + } + return Byte.toUnsignedInt(buf[0]); + } + + /** + * {@inheritDoc} + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkOpen(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + int totalReadLen = 0; + while (len > 0) { + if (partStreams.size() == 0 || + (partStreams.size() - 1 <= partIndex && + partStreams.get(partIndex).getRemaining() == 0)) { + return totalReadLen == 0 ? EOF : totalReadLen; + } + + // Get the current partStream and read data from it + OzoneCryptoInputStream current = partStreams.get(partIndex); + // CryptoInputStream reads hadoop.security.crypto.buffer.size number of + // bytes (default 8KB) at a time. This needs to be taken into account + // in calculating the numBytesToRead. + int numBytesToRead = getNumBytesToRead(len, (int)current.getRemaining(), + current.getBufferSize()); + int numBytesRead; + + if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) { + // There was some adjustment made in position and/ or length of data + // to be read to account for Crypto buffer boundary. Hence, read the + // data into a temp buffer and then copy only the requested data into + // clients buffer. + byte[] tempBuffer = new byte[numBytesToRead]; + int actualNumBytesRead = current.read(tempBuffer, 0, + numBytesToRead); + numBytesRead = actualNumBytesRead - readPositionAdjustedBy - + readLengthAdjustedBy; + + if (actualNumBytesRead != numBytesToRead) { + throw new IOException(String.format("Inconsistent read for key=%s " + + "part=%d length=%d numBytesToRead(accounting for Crypto " + + "boundaries)=%d numBytesRead(actual)=%d " + + "numBytesToBeRead(into client buffer discarding crypto " + + "boundary adjustments)=%d", + key, partIndex, current.getLength(), numBytesToRead, + actualNumBytesRead, numBytesRead)); + } + + // TODO: Byte array copies are not optimal. If there is a better and + // more optimal solution to copy only a part of read data into + // client buffer, this should be updated. + System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off, + numBytesRead); + + LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " + + "instead of {} bytes to account for Crypto buffer boundary. " + + "Client buffer will be copied with read data from position {}" + + "upto position {}, discarding the extra bytes read to " + + "maintain Crypto buffer boundary limits", key, partIndex, + actualNumBytesRead, numBytesRead, readPositionAdjustedBy, + actualNumBytesRead - readPositionAdjustedBy); + + // Reset readPositionAdjustedBy and readLengthAdjustedBy + readPositionAdjustedBy = 0; + readLengthAdjustedBy = 0; + } else { + numBytesRead = current.read(b, off, numBytesToRead); + if (numBytesRead != numBytesToRead) { + throw new IOException(String.format("Inconsistent read for key=%s " + + "part=%d length=%d numBytesToRead=%d numBytesRead=%d", + key, partIndex, current.getLength(), numBytesToRead, + numBytesRead)); + } + } + + totalReadLen += numBytesRead; + off += numBytesRead; + len -= numBytesRead; + + if (current.getRemaining() <= 0 && + ((partIndex + 1) < partStreams.size())) { + partIndex += 1; + } + + } + return totalReadLen; + } + + /** + * Get number of bytes to read from the current stream based on the length + * to be read, number of bytes remaining in the stream and the Crypto buffer + * size. + * Reads should be performed at the CryptoInputStream Buffer boundaries only. + * Otherwise, the decryption will be incorrect. + */ + private int getNumBytesToRead(int lenToRead, int remaining, + int cryptoBufferSize) throws IOException { + + Preconditions.checkArgument(readPositionAdjustedBy == 0); + Preconditions.checkArgument(readLengthAdjustedBy == 0); + + // Check and adjust position if required + adjustReadPosition(cryptoBufferSize); + remaining += readPositionAdjustedBy; + lenToRead += readPositionAdjustedBy; + + return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize); + } + + /** + * Reads should be performed at the CryptoInputStream Buffer boundary size. + * Otherwise, the decryption will be incorrect. Hence, if the position is + * not at the boundary limit, we have to adjust the position and might need + * to read more data than requested. The extra data will be filtered out + * before returning to the client. + */ + private void adjustReadPosition(long cryptoBufferSize) throws IOException { + // Position of the buffer in current stream + long currentPosOfStream = partStreams.get(partIndex).getPos(); + int modulus = (int) (currentPosOfStream % cryptoBufferSize); + if (modulus != 0) { + // Adjustment required. + // Update readPositionAdjustedBy and seek to the adjusted position + readPositionAdjustedBy = modulus; + // Seek current partStream to adjusted position. We do not need to + // reset the seeked positions of other streams. + partStreams.get(partIndex) + .seek(currentPosOfStream - readPositionAdjustedBy); + LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " + + "position {} by -{} to account for Crypto buffer boundary", + key, partIndex, currentPosOfStream, readPositionAdjustedBy); + } + } + + /** + * If the length of data requested does not end at a Crypto Buffer + * boundary, the number of bytes to be read must be adjusted accordingly. + * The extra data will be filtered out before returning to the client. + */ + private int adjustNumBytesToRead(int lenToRead, int remaining, + int cryptoBufferSize) { + int numBytesToRead = Math.min(cryptoBufferSize, remaining); + if (lenToRead < numBytesToRead) { + // Adjustment required; Update readLengthAdjustedBy. + readLengthAdjustedBy = numBytesToRead - lenToRead; + LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " + + "by +{} to account for Crypto buffer boundary", + key, partIndex, readLengthAdjustedBy); + } + return numBytesToRead; + } + + /** + * Seeks the InputStream to the specified position. This involves 2 steps: + * 1. Updating the partIndex to the partStream corresponding to the + * seeked position. + * 2. Seeking the corresponding partStream to the adjusted position. + * + * For example, let’s say the part sizes are 200 bytes and part[0] stores + * data from indices 0 - 199, part[1] from indices 200 - 399 and so on. + * Let’s say we seek to position 240. In the first step, the partIndex + * would be updated to 1 as indices 200 - 399 reside in partStream[1]. In + * the second step, the partStream[1] would be seeked to position 40 (= + * 240 - blockOffset[1] (= 200)). + */ + @Override + public void seek(long pos) throws IOException { + if (pos == 0 && length == 0) { + // It is possible for length and pos to be zero in which case + // seek should return instead of throwing exception + return; + } + if (pos < 0 || pos > length) { + throw new EOFException("EOF encountered at pos: " + pos); + } + + // 1. Update the partIndex + if (partIndex >= partStreams.size()) { + partIndex = Arrays.binarySearch(partOffsets, pos); + } else if (pos < partOffsets[partIndex]) { + partIndex = + Arrays.binarySearch(partOffsets, 0, partIndex, pos); + } else if (pos >= partOffsets[partIndex] + partStreams + .get(partIndex).getLength()) { + partIndex = Arrays.binarySearch(partOffsets, partIndex + 1, + partStreams.size(), pos); + } + if (partIndex < 0) { + // Binary search returns -insertionPoint - 1 if element is not present + // in the array. insertionPoint is the point at which element would be + // inserted in the sorted array. We need to adjust the blockIndex + // accordingly so that partIndex = insertionPoint - 1 + partIndex = -partIndex - 2; + } + + // Reset the previous partStream's position + partStreams.get(prevPartIndex).seek(0); + + // Reset all the partStreams above the partIndex. We do this to reset + // any previous reads which might have updated the higher part + // streams position. + for (int index = partIndex + 1; index < partStreams.size(); index++) { + partStreams.get(index).seek(0); + } + // 2. Seek the partStream to the adjusted position + partStreams.get(partIndex).seek(pos - partOffsets[partIndex]); + prevPartIndex = partIndex; + } + + @Override + public synchronized long getPos() throws IOException { + checkOpen(); + return length == 0 ? 0 : partOffsets[partIndex] + + partStreams.get(partIndex).getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + checkOpen(); + long remaining = length - getPos(); + return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; + } + + @Override + public void unbuffer() { + for (CryptoInputStream cryptoInputStream : partStreams) { + cryptoInputStream.unbuffer(); + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + + long toSkip = Math.min(n, length - getPos()); + seek(getPos() + toSkip); + return toSkip; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + for (OzoneCryptoInputStream partStream : partStreams) { + partStream.close(); + } + } + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkOpen() throws IOException { + if (closed) { + throw new IOException( + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key); + } + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java new file mode 100644 index 000000000000..9d5d888688f4 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import java.io.IOException; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.crypto.CryptoStreamUtils; +import org.apache.hadoop.fs.Seekable; + +/** + * A CryptoInputStream for Ozone with length. This stream is used to read + * Keys in Encrypted Buckets. + */ +public class OzoneCryptoInputStream extends CryptoInputStream + implements Seekable { + + private final long length; + private final int bufferSize; + + public OzoneCryptoInputStream(LengthInputStream in, + CryptoCodec codec, byte[] key, byte[] iv) throws IOException { + super(in.getWrappedStream(), codec, key, iv); + this.length = in.getLength(); + // This is the buffer size used while creating the CryptoInputStream + // internally + this.bufferSize = CryptoStreamUtils.getBufferSize(codec.getConf()); + } + + public long getLength() { + return length; + } + + public int getBufferSize() { + return bufferSize; + } + + public long getRemaining() throws IOException { + return length - getPos(); + } +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java index f01975c94d8e..fb3928d071eb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java @@ -30,6 +30,9 @@ public class OzoneInputStream extends InputStream implements CanUnbuffer { private final InputStream inputStream; + public OzoneInputStream() { + inputStream = null; + } /** * Constructs OzoneInputStream with KeyInputStream. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index e4a7d6a100ce..bf88b6fd38de 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.client.io; +import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import java.io.IOException; @@ -63,6 +64,12 @@ public synchronized void close() throws IOException { public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { if (outputStream instanceof KeyOutputStream) { return ((KeyOutputStream) outputStream).getCommitUploadPartInfo(); + } else if (outputStream instanceof CryptoOutputStream) { + OutputStream wrappedStream = + ((CryptoOutputStream) outputStream).getWrappedStream(); + if (wrappedStream instanceof KeyOutputStream) { + return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo(); + } } // Otherwise return null. return null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index d23560c07dae..ed85a32adb13 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -67,6 +67,8 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; +import org.apache.hadoop.ozone.client.io.MultipartCryptoKeyInputStream; +import org.apache.hadoop.ozone.client.io.OzoneCryptoInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; @@ -942,7 +944,17 @@ public OzoneOutputStream createMultipartKey(String volumeName, keyOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), openKey.getOpenVersion()); - return new OzoneOutputStream(keyOutputStream); + FileEncryptionInfo feInfo = keyOutputStream.getFileEncryptionInfo(); + if (feInfo != null) { + KeyProvider.KeyVersion decrypted = getDEK(feInfo); + final CryptoOutputStream cryptoOut = + new CryptoOutputStream(keyOutputStream, + OzoneKMSUtil.getCryptoCodec(conf, feInfo), + decrypted.getMaterial(), feInfo.getIV()); + return new OzoneOutputStream(cryptoOut); + } else { + return new OzoneOutputStream(keyOutputStream); + } } @Override @@ -1192,23 +1204,18 @@ public List getAcl(OzoneObj obj) throws IOException { private OzoneInputStream createInputStream( OmKeyInfo keyInfo, Function retryFunction) throws IOException { - LengthInputStream lengthInputStream = KeyInputStream - .getFromOmKeyInfo(keyInfo, xceiverClientManager, - clientConfig.isChecksumVerify(), retryFunction); + // When Key is not MPU or when Key is MPU and encryption is not enabled + // Need to revisit for GDP. FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo(); - if (feInfo != null) { - final KeyProvider.KeyVersion decrypted = getDEK(feInfo); - final CryptoInputStream cryptoIn = - new CryptoInputStream(lengthInputStream.getWrappedStream(), - OzoneKMSUtil.getCryptoCodec(conf, feInfo), - decrypted.getMaterial(), feInfo.getIV()); - return new OzoneInputStream(cryptoIn); - } else { - try{ - GDPRSymmetricKey gk; - Map keyInfoMetadata = keyInfo.getMetadata(); - if(Boolean.valueOf(keyInfoMetadata.get(OzoneConsts.GDPR_FLAG))){ - gk = new GDPRSymmetricKey( + + if (feInfo == null) { + LengthInputStream lengthInputStream = KeyInputStream + .getFromOmKeyInfo(keyInfo, xceiverClientManager, + clientConfig.isChecksumVerify(), retryFunction); + try { + Map< String, String > keyInfoMetadata = keyInfo.getMetadata(); + if (Boolean.valueOf(keyInfoMetadata.get(OzoneConsts.GDPR_FLAG))) { + GDPRSymmetricKey gk = new GDPRSymmetricKey( keyInfoMetadata.get(OzoneConsts.GDPR_SECRET), keyInfoMetadata.get(OzoneConsts.GDPR_ALGORITHM) ); @@ -1216,11 +1223,39 @@ private OzoneInputStream createInputStream( return new OzoneInputStream( new CipherInputStream(lengthInputStream, gk.getCipher())); } - }catch (Exception ex){ + } catch (Exception ex) { throw new IOException(ex); } + return new OzoneInputStream(lengthInputStream.getWrappedStream()); + } else if (!keyInfo.getLatestVersionLocations().isMultipartKey()) { + // Regular Key with FileEncryptionInfo + LengthInputStream lengthInputStream = KeyInputStream + .getFromOmKeyInfo(keyInfo, xceiverClientManager, + clientConfig.isChecksumVerify(), retryFunction); + final KeyProvider.KeyVersion decrypted = getDEK(feInfo); + final CryptoInputStream cryptoIn = + new CryptoInputStream(lengthInputStream.getWrappedStream(), + OzoneKMSUtil.getCryptoCodec(conf, feInfo), + decrypted.getMaterial(), feInfo.getIV()); + return new OzoneInputStream(cryptoIn); + } else { + // Multipart Key with FileEncryptionInfo + List lengthInputStreams = KeyInputStream + .getStreamsFromKeyInfo(keyInfo, xceiverClientManager, + clientConfig.isChecksumVerify(), retryFunction); + final KeyProvider.KeyVersion decrypted = getDEK(feInfo); + + List cryptoInputStreams = new ArrayList<>(); + for(LengthInputStream lengthInputStream : lengthInputStreams) { + final OzoneCryptoInputStream ozoneCryptoInputStream = + new OzoneCryptoInputStream(lengthInputStream, + OzoneKMSUtil.getCryptoCodec(conf, feInfo), + decrypted.getMaterial(), feInfo.getIV()); + cryptoInputStreams.add(ozoneCryptoInputStream); + } + return new MultipartCryptoKeyInputStream(keyInfo.getKeyName(), + cryptoInputStreams); } - return new OzoneInputStream(lengthInputStream.getWrappedStream()); } private OzoneOutputStream createOutputStream(OpenKeySession openKey, diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 648176860cb7..43213a98490f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -145,9 +145,12 @@ public void updateModifcationTime() { * * @param locationInfoList list of locationInfo */ - public void updateLocationInfoList(List locationInfoList) { + public void updateLocationInfoList(List locationInfoList, + boolean isMpu) { long latestVersion = getLatestVersionLocations().getVersion(); OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); + + keyLocationInfoGroup.setMultipartKey(isMpu); // Updates the latest locationList in the latest version only with // given locationInfoList here. // TODO : The original allocated list and the updated list here may vary @@ -161,6 +164,8 @@ public void updateLocationInfoList(List locationInfoList) { keyLocationInfoGroup.addAll(latestVersion, locationInfoList); } + + /** * Append a set of blocks to the latest version. Note that these blocks are * part of the latest version, not a new version. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 4eac8bec7093..d1a721a476fa 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -42,21 +42,26 @@ public final class OmKeyLocationInfo { private Pipeline pipeline; + // PartNumber is set for Multipart upload Keys. + private int partNumber = -1; + private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length, - long offset) { + long offset, int partNumber) { this.blockID = blockID; this.pipeline = pipeline; this.length = length; this.offset = offset; + this.partNumber = partNumber; } private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length, - long offset, Token token) { + long offset, Token token, int partNumber) { this.blockID = blockID; this.pipeline = pipeline; this.length = length; this.offset = offset; this.token = token; + this.partNumber = partNumber; } public void setCreateVersion(long version) { @@ -111,6 +116,14 @@ public void setPipeline(Pipeline pipeline) { this.pipeline = pipeline; } + public void setPartNumber(int partNumber) { + this.partNumber = partNumber; + } + + public int getPartNumber() { + return partNumber; + } + /** * Builder of OmKeyLocationInfo. */ @@ -120,6 +133,7 @@ public static class Builder { private long offset; private Token token; private Pipeline pipeline; + private int partNumber; public Builder setBlockID(BlockID blockId) { this.blockID = blockId; @@ -147,8 +161,14 @@ public Builder setToken(Token bToken) { return this; } + public Builder setPartNumber(int partNum) { + this.partNumber = partNum; + return this; + } + public OmKeyLocationInfo build() { - return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); + return new OmKeyLocationInfo(blockID, pipeline, length, offset, token, + partNumber); } } @@ -161,7 +181,7 @@ public KeyLocation getProtobuf(boolean ignorePipeline, int clientVersion) { .setBlockID(blockID.getProtobuf()) .setLength(length) .setOffset(offset) - .setCreateVersion(createVersion); + .setCreateVersion(createVersion).setPartNumber(partNumber); if (this.token != null) { builder.setToken(OzonePBHelper.protoFromToken(token)); } @@ -189,7 +209,7 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { BlockID.getFromProtobuf(keyLocation.getBlockID()), getPipeline(keyLocation), keyLocation.getLength(), - keyLocation.getOffset()); + keyLocation.getOffset(), keyLocation.getPartNumber()); if(keyLocation.hasToken()) { info.token = (Token) OzonePBHelper.tokenFromProto(keyLocation.getToken()); @@ -199,14 +219,15 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { } @Override - public String toString() { + public String toString() { return "{blockID={containerID=" + blockID.getContainerID() + ", localID=" + blockID.getLocalID() + "}" + ", length=" + length + ", offset=" + offset + ", token=" + token + ", pipeline=" + pipeline + - ", createVersion=" + createVersion + '}'; + ", createVersion=" + createVersion + ", partNumber=" + partNumber + + '}'; } @Override diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java index 440b1cb3f9ef..a93bcf24e441 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java @@ -32,9 +32,15 @@ public class OmKeyLocationInfoGroup { private final long version; private final Map> locationVersionMap; + private boolean isMultipartKey; public OmKeyLocationInfoGroup(long version, List locations) { + this(version, locations, false); + } + + public OmKeyLocationInfoGroup(long version, + List locations, boolean isMultipartKey) { this.version = version; locationVersionMap = new HashMap<>(); for (OmKeyLocationInfo info : locations) { @@ -44,14 +50,30 @@ public OmKeyLocationInfoGroup(long version, } //prevent NPE this.locationVersionMap.putIfAbsent(version, new ArrayList<>()); + this.isMultipartKey = isMultipartKey; + } public OmKeyLocationInfoGroup(long version, Map> locations) { + this(version, locations, false); + } + + public OmKeyLocationInfoGroup(long version, + Map> locations, boolean isMultipartKey) { this.version = version; this.locationVersionMap = locations; //prevent NPE this.locationVersionMap.putIfAbsent(version, new ArrayList<>()); + this.isMultipartKey = isMultipartKey; + } + + public void setMultipartKey(boolean isMpu) { + this.isMultipartKey = isMpu; + } + + public boolean isMultipartKey() { + return isMultipartKey; } /** @@ -83,7 +105,7 @@ public List getLocationList(Long versionToFetch) { public KeyLocationList getProtobuf(boolean ignorePipeline, int clientVersion) { KeyLocationList.Builder builder = KeyLocationList.newBuilder() - .setVersion(version); + .setVersion(version).setIsMultipartKey(isMultipartKey); List keyLocationList = new ArrayList<>(); for (List locationList : locationVersionMap.values()) { @@ -100,7 +122,9 @@ public static OmKeyLocationInfoGroup getFromProtobuf( keyLocationList.getVersion(), keyLocationList.getKeyLocationsList().stream() .map(OmKeyLocationInfo::getFromProtobuf) - .collect(Collectors.groupingBy(OmKeyLocationInfo::getCreateVersion)) + .collect(Collectors.groupingBy( + OmKeyLocationInfo::getCreateVersion)), + keyLocationList.getIsMultipartKey() ); } @@ -141,6 +165,7 @@ void addAll(long versionToAdd, List locationInfoList) { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("version:").append(version).append(" "); + sb.append("isMultipartKey:").append(isMultipartKey); for (List kliList : locationVersionMap.values()) { for(OmKeyLocationInfo kli: kliList) { sb.append(kli.getLocalID()).append(" || "); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index 7aced89cc808..49243eeac794 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -22,10 +22,16 @@ import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.TreeMap; import java.util.UUID; +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.apache.hadoop.crypto.key.kms.server.MiniKMS; @@ -48,6 +54,7 @@ import org.apache.hadoop.ozone.client.OzoneKey; import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.MultipartCryptoKeyInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -55,21 +62,25 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; /** * This class is to test all the public facing APIs of Ozone Client. */ -@Ignore -public class TestOzoneAtRestEncryption extends TestOzoneRpcClient { +public class TestOzoneAtRestEncryption { private static MiniOzoneCluster cluster = null; private static MiniKMS miniKMS; @@ -83,15 +94,15 @@ public class TestOzoneAtRestEncryption extends TestOzoneRpcClient { private static File testDir; private static OzoneConfiguration conf; private static final String TEST_KEY = "key1"; + private static final Random RANDOM = new Random(); + private static final int MPU_PART_MIN_SIZE = 256 * 1024; // 256KB + private static final int BLOCK_SIZE = 64 * 1024; // 64KB + private static final int CHUNK_SIZE = 16 * 1024; // 16KB + private static final int DEFAULT_CRYPTO_BUFFER_SIZE = 8 * 1024; // 8KB + // (this is the default Crypto Buffer size as determined by the config + // hadoop.security.crypto.buffer.size) - /** - * Create a MiniOzoneCluster for testing. - *

- * Ozone is made active by setting OZONE_ENABLED = true - * - * @throws IOException - */ @BeforeClass public static void init() throws Exception { testDir = GenericTestUtils.getTestDir( @@ -115,6 +126,9 @@ public static void init() throws Exception { cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(10) .setScmId(SCM_ID) + .setBlockSize(BLOCK_SIZE) + .setChunkSize(CHUNK_SIZE) + .setStreamBufferSizeUnit(StorageUnit.BYTES) .setCertificateClient(certificateClientTest) .build(); cluster.getOzoneManager().startSecretManager(); @@ -124,6 +138,7 @@ public static void init() throws Exception { storageContainerLocationClient = cluster.getStorageContainerLocationClient(); ozoneManager = cluster.getOzoneManager(); + ozoneManager.setMinMultipartUploadPartSize(MPU_PART_MIN_SIZE); TestOzoneRpcClient.setCluster(cluster); TestOzoneRpcClient.setOzClient(ozClient); TestOzoneRpcClient.setOzoneManager(ozoneManager); @@ -136,11 +151,9 @@ public static void init() throws Exception { createKey(TEST_KEY, cluster.getOzoneManager().getKmsProvider(), conf); } - - - /** - * Close OzoneClient and shutdown MiniOzoneCluster. - */ + /** + * Close OzoneClient and shutdown MiniOzoneCluster. + */ @AfterClass public static void shutdown() throws IOException { if(ozClient != null) { @@ -271,6 +284,14 @@ public void testKeyWithEncryptionAndGdpr() throws Exception { OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); String objectKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); + + GenericTestUtils.waitFor(() -> { + try { + return omMetadataManager.getDeletedTable().isExist(objectKey); + } catch (IOException e) { + return false; + } + }, 500, 100000); RepeatedOmKeyInfo deletedKeys = omMetadataManager.getDeletedTable().get(objectKey); Map deletedKeyMetadata = @@ -323,4 +344,151 @@ private static void createKey(String keyName, KeyProvider provider.createKey(keyName, options); provider.flush(); } + + @Test + public void testMPUwithOnePart() throws Exception { + testMultipartUploadWithEncryption(1); + } + + @Test + public void testMPUwithTwoParts() throws Exception { + testMultipartUploadWithEncryption(2); + } + + public void testMultipartUploadWithEncryption(int numParts) throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = "mpu_test_key_" + numParts; + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + BucketArgs bucketArgs = BucketArgs.newBuilder() + .setBucketEncryptionKey(TEST_KEY).build(); + volume.createBucket(bucketName, bucketArgs); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Initiate multipart upload + String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE, + ONE); + + // Upload Parts + Map partsMap = new TreeMap<>(); + List partsData = new ArrayList<>(); + int keySize = 0; + for (int i = 1; i <= numParts; i++) { + // Generate random data with different sizes for each part. + // Adding a random int with a cap at 8K (the default crypto buffer + // size) to get parts whose last byte does not coincide with crypto + // buffer boundary. + byte[] data = generateRandomData((MPU_PART_MIN_SIZE * i) + + RANDOM.nextInt(DEFAULT_CRYPTO_BUFFER_SIZE)); + String partName = uploadPart(bucket, keyName, uploadID, i, data); + partsMap.put(i, partName); + partsData.add(data); + keySize += data.length; + } + + // Combine the parts data into 1 byte array for verification + byte[] inputData = new byte[keySize]; + int dataCopied = 0; + for (int i = 1; i <= numParts; i++) { + byte[] partBytes = partsData.get(i - 1); + System.arraycopy(partBytes, 0, inputData, dataCopied, partBytes.length); + dataCopied += partBytes.length; + } + + // Complete MPU + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + + // Read different data lengths and starting from different offsets and + // verify the data matches. + Random random = new Random(); + int randomSize = random.nextInt(keySize/2); + int randomOffset = random.nextInt(keySize - randomSize); + + int[] readDataSizes = {keySize, keySize / 3 + 1, BLOCK_SIZE, + BLOCK_SIZE * 2 + 1, CHUNK_SIZE, CHUNK_SIZE / 4 - 1, + DEFAULT_CRYPTO_BUFFER_SIZE, DEFAULT_CRYPTO_BUFFER_SIZE / 2, 1, + randomSize}; + + int[] readFromPositions = {0, DEFAULT_CRYPTO_BUFFER_SIZE + 10, CHUNK_SIZE, + BLOCK_SIZE - DEFAULT_CRYPTO_BUFFER_SIZE + 1, BLOCK_SIZE, keySize / 3, + keySize - 1, randomOffset}; + + // Create an input stream to read the data + OzoneInputStream inputStream = bucket.readKey(keyName); + Assert.assertTrue(inputStream instanceof MultipartCryptoKeyInputStream); + MultipartCryptoKeyInputStream cryptoInputStream = + (MultipartCryptoKeyInputStream) inputStream; + + for (int readDataLen : readDataSizes) { + for (int readFromPosition : readFromPositions) { + // Check that offset + buffer size does not exceed the key size + if (readFromPosition + readDataLen > keySize) { + continue; + } + + byte[] readData = new byte[readDataLen]; + cryptoInputStream.seek(readFromPosition); + inputStream.read(readData, 0, readDataLen); + + assertReadContent(inputData, readData, readFromPosition); + } + } + } + + private static byte[] generateRandomData(int length) { + byte[] bytes = new byte[length]; + RANDOM.nextBytes(bytes); + return bytes; + } + + private String initiateMultipartUpload(OzoneBucket bucket, String keyName, + ReplicationType replicationType, ReplicationFactor replicationFactor) + throws Exception { + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + replicationType, replicationFactor); + + String uploadID = multipartInfo.getUploadID(); + Assert.assertNotNull(uploadID); + return uploadID; + } + + private String uploadPart(OzoneBucket bucket, String keyName, + String uploadID, int partNumber, byte[] data) throws Exception { + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + data.length, partNumber, uploadID); + ozoneOutputStream.write(data, 0, data.length); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + ozoneOutputStream.getCommitUploadPartInfo(); + + Assert.assertNotNull(omMultipartCommitUploadPartInfo); + Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName()); + return omMultipartCommitUploadPartInfo.getPartName(); + } + + private void completeMultipartUpload(OzoneBucket bucket, String keyName, + String uploadID, Map partsMap) throws Exception { + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket + .completeMultipartUpload(keyName, uploadID, partsMap); + + Assert.assertNotNull(omMultipartUploadCompleteInfo); + Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket + .getName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket + .getVolumeName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName); + Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash()); + } + + private static void assertReadContent(byte[] inputData, byte[] readData, + int offset) { + byte[] inputDataForComparison = Arrays.copyOfRange(inputData, offset, + offset + readData.length); + Assert.assertArrayEquals("Read data does not match input data at offset " + + offset + " and length " + readData.length, + inputDataForComparison, readData); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 4508ade10150..cd32f4f1fc56 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -89,6 +89,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; @@ -3069,7 +3070,6 @@ private byte[] generateData(int size, byte val) { return chars; } - private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val) throws Exception { // Initiate Multipart upload request @@ -3098,11 +3098,9 @@ private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val) partsMap.put(3, partName); length += part3.getBytes(UTF_8).length; - // Complete multipart upload request completeMultipartUpload(bucket, keyName, uploadID, partsMap); - //Now Read the key which has been completed multipart upload. byte[] fileContent = new byte[data.length + data.length + part3.getBytes( UTF_8).length]; @@ -3122,8 +3120,19 @@ private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val) sb.append(part2); sb.append(part3); Assert.assertEquals(sb.toString(), new String(fileContent, UTF_8)); - } + String ozoneKey = ozoneManager.getMetadataManager() + .getOzoneKey(bucket.getVolumeName(), bucket.getName(), keyName); + OmKeyInfo omKeyInfo = ozoneManager.getMetadataManager().getKeyTable() + .get(ozoneKey); + + OmKeyLocationInfoGroup latestVersionLocations = + omKeyInfo.getLatestVersionLocations(); + Assert.assertEquals(true, latestVersionLocations.isMultipartKey()); + latestVersionLocations.getBlocksLatestVersionOnly() + .forEach(omKeyLocationInfo -> + Assert.assertTrue(omKeyLocationInfo.getPartNumber() != -1)); + } private String initiateMultipartUpload(OzoneBucket bucket, String keyName, ReplicationType replicationType, ReplicationFactor replicationFactor) diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 38b8c501d565..da4372540831 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -747,12 +747,15 @@ message KeyLocation { // get the up to date pipeline information. This will need o3fs // provide not only a OM delegation token but also a SCM delegation token optional hadoop.hdds.Pipeline pipeline = 7; + + optional int32 partNumber = 9 [default = -1]; } message KeyLocationList { optional uint64 version = 1; repeated KeyLocation keyLocations = 2; optional FileEncryptionInfoProto fileEncryptionInfo = 3; + optional bool isMultipartKey = 4 [default = false]; } message KeyInfo { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 7cc61ecfad36..4a968fd5910f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -621,7 +621,7 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException { keyInfo.setModificationTime(Time.now()); //update the block length for each block - keyInfo.updateLocationInfoList(locationInfoList); + keyInfo.updateLocationInfoList(locationInfoList, false); metadataManager.getStore().move( openKey, objectKey, @@ -1116,7 +1116,7 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( // set the data size and location info list keyInfo.setDataSize(omKeyArgs.getDataSize()); - keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList()); + keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList(), true); partName = metadataManager.getOzoneKey(volumeName, bucketName, keyName) + clientID; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index ae3f20aa1968..0b4b8ea69311 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -325,6 +325,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl // execution, we can get from ozoneManager. private long maxUserVolumeCount; + private int minMultipartUploadPartSize = OzoneConsts.OM_MULTIPART_MIN_SIZE; + private final ScmClient scmClient; private final long scmBlockSize; private final int preallocateBlocksMax; @@ -3797,4 +3799,13 @@ private OmVolumeArgs createS3VolumeInfo(String s3Volume, return omVolumeArgs.build(); } + public int getMinMultipartUploadPartSize() { + return minMultipartUploadPartSize; + } + + @VisibleForTesting + public void setMinMultipartUploadPartSize(int partSizeForTest) { + this.minMultipartUploadPartSize = partSizeForTest; + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index 48d71e789e88..fe72ea225238 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -186,7 +186,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // Update the block length for each block List allocatedLocationInfoList = omKeyInfo.getLatestVersionLocations().getLocationList(); - omKeyInfo.updateLocationInfoList(locationInfoList); + omKeyInfo.updateLocationInfoList(locationInfoList, false); // Set the UpdateID to current transactionLogIndex omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java index f9be746d4188..55f499054f90 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java @@ -169,7 +169,11 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { newKeyArgs.setKeyName(keyPath); - generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager); + if (keyArgs.getIsMultipartKey()) { + getFileEncryptionInfoForMpuKey(keyArgs, newKeyArgs, ozoneManager); + } else { + generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager); + } newCreateKeyRequest = createKeyRequest.toBuilder().setKeyArgs(newKeyArgs) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index bb671df94c41..814e0651e90d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -546,6 +546,37 @@ protected void generateRequiredEncryptionInfo(KeyArgs keyArgs, } } + protected void getFileEncryptionInfoForMpuKey(KeyArgs keyArgs, + KeyArgs.Builder newKeyArgs, OzoneManager ozoneManager) + throws IOException { + + String volumeName = keyArgs.getVolumeName(); + String bucketName = keyArgs.getBucketName(); + + boolean acquireLock = false; + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + if (ozoneManager.getKmsProvider() != null) { + acquireLock = omMetadataManager.getLock().acquireReadLock( + BUCKET_LOCK, volumeName, bucketName); + try { + OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get( + omMetadataManager.getMultipartKey(volumeName, bucketName, + keyArgs.getKeyName(), keyArgs.getMultipartUploadID())); + + if (omKeyInfo != null && omKeyInfo.getFileEncryptionInfo() != null) { + newKeyArgs.setFileEncryptionInfo( + OMPBHelper.convert(omKeyInfo.getFileEncryptionInfo())); + } + } finally { + if (acquireLock) { + omMetadataManager.getLock().releaseReadLock( + BUCKET_LOCK, volumeName, bucketName); + } + } + } + } + /** * Get FileEncryptionInfoProto from KeyArgs. * @param keyArgs diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java index c399cad53aab..b5c9fdffbf54 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java @@ -23,7 +23,6 @@ import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; @@ -38,6 +37,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.util.Time; import org.apache.hadoop.hdds.utils.UniqueId; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; @@ -51,7 +51,6 @@ import java.util.Map; import java.util.UUID; -import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; /** @@ -80,6 +79,8 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .setKeyName(validateAndNormalizeKey( ozoneManager.getEnableFileSystemPaths(), keyArgs.getKeyName())); + generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager); + return getOmRequest().toBuilder() .setUserInfo(getUserInfo()) .setInitiateMultiPartUploadRequest( @@ -133,17 +134,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, validateBucketAndVolume(omMetadataManager, volumeName, bucketName); - // If KMS is configured and TDE is enabled on bucket, throw MPU not - // supported. - if (ozoneManager.getKmsProvider() != null) { - if (omMetadataManager.getBucketTable().get( - omMetadataManager.getBucketKey(volumeName, bucketName)) - .getEncryptionKeyInfo() != null) { - throw new OMException("MultipartUpload is not yet supported on " + - "encrypted buckets", NOT_SUPPORTED_OPERATION); - } - } - // We are adding uploadId to key, because if multiple users try to // perform multipart upload on the same key, each will try to upload, who // ever finally commit the key, we see that key in ozone. Suppose if we @@ -190,6 +180,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList())) .setObjectID(objectID) .setUpdateID(transactionLogIndex) + .setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ? + OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null) .build(); // Add to cache diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index e0ec44295f17..d529f92c7f75 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -155,7 +155,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, omKeyInfo.setDataSize(keyArgs.getDataSize()); omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream() .map(OmKeyLocationInfo::getFromProtobuf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), true); // Set Modification time omKeyInfo.setModificationTime(keyArgs.getModificationTime()); // Set the UpdateID to current transactionLogIndex diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index dff022ba80bf..c4de2fb002b5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -54,7 +54,6 @@ import com.google.common.base.Optional; import org.apache.commons.codec.digest.DigestUtils; -import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; import org.slf4j.Logger; @@ -217,12 +216,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // Except for last part all parts should have minimum size. if (currentPartCount != partsListSize) { - if (currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) { - LOG.error("MultipartUpload: {} Part number: {} size {} is less " - + "than minimum part size {}", ozoneKey, - partKeyInfo.getPartNumber(), - currentPartKeyInfo.getDataSize(), - OzoneConsts.OM_MULTIPART_MIN_SIZE); + if (currentPartKeyInfo.getDataSize() < + ozoneManager.getMinMultipartUploadPartSize()) { + LOG.error("MultipartUpload: {} Part number: {} size {} is less" + + " than minimum part size {}", ozoneKey, + partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(), + ozoneManager.getMinMultipartUploadPartSize()); throw new OMException( failureMessage(requestedVolume, requestedBucket, keyName) + ". Entity too small.", @@ -233,6 +232,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // As all part keys will have only one version. OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo .getKeyLocationVersions().get(0); + + // Set partNumber in each block. + currentKeyInfoGroup.getLocationList().forEach( + omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber)); + partLocationInfos.addAll(currentKeyInfoGroup.getLocationList()); dataSize += currentPartKeyInfo.getDataSize(); } @@ -248,7 +252,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, if (omKeyInfo == null) { // This is a newly added key, it does not have any versions. OmKeyLocationInfoGroup keyLocationInfoGroup = new - OmKeyLocationInfoGroup(0, partLocationInfos); + OmKeyLocationInfoGroup(0, partLocationInfos, true); // Get the objectID of the key from OpenKeyTable OmKeyInfo dbOpenKeyInfo = omMetadataManager.getOpenKeyTable() @@ -262,6 +266,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, .setCreationTime(keyArgs.getModificationTime()) .setModificationTime(keyArgs.getModificationTime()) .setDataSize(dataSize) + .setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo()) .setOmKeyLocationInfos( Collections.singletonList(keyLocationInfoGroup)) .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList())); @@ -277,7 +282,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // But now as versioning is not supported, just following the commit // key approach. When versioning support comes, then we can uncomment // below code keyInfo.addNewVersion(locations); - omKeyInfo.updateLocationInfoList(partLocationInfos); + omKeyInfo.updateLocationInfoList(partLocationInfos, true); omKeyInfo.setModificationTime(keyArgs.getModificationTime()); omKeyInfo.setDataSize(dataSize); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java index 33fd1cd215b2..088b232b82e9 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java @@ -21,14 +21,6 @@ import java.util.UUID; -import com.google.common.base.Optional; -import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; -import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.request.OMClientRequest; -import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -36,12 +28,6 @@ import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; -import org.mockito.Mockito; - -import static org.apache.hadoop.crypto.CipherSuite.AES_CTR_NOPADDING; -import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES; -import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NOT_SUPPORTED_OPERATION; -import static org.mockito.Mockito.when; /** * Tests S3 Initiate Multipart Upload request. @@ -132,7 +118,6 @@ public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception { Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey)); Assert.assertNull(omMetadataManager.getMultipartInfoTable() .get(multipartKey)); - } @Test @@ -141,7 +126,6 @@ public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception { String bucketName = UUID.randomUUID().toString(); String keyName = UUID.randomUUID().toString(); - OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName, keyName); @@ -162,49 +146,5 @@ public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception { Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey)); Assert.assertNull(omMetadataManager.getMultipartInfoTable() .get(multipartKey)); - - } - - @Test - public void testMPUNotSupported() throws Exception { - String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); - String keyName = UUID.randomUUID().toString(); - - when(ozoneManager.getKmsProvider()) - .thenReturn(Mockito.mock(KeyProviderCryptoExtension.class)); - - TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager); - - // Set encryption info and create bucket - OmBucketInfo omBucketInfo = - OmBucketInfo.newBuilder().setVolumeName(volumeName) - .setBucketName(bucketName).setCreationTime(Time.now()) - .setBucketEncryptionKey(new BucketEncryptionKeyInfo.Builder() - .setKeyName("dummy").setSuite(AES_CTR_NOPADDING) - .setVersion(ENCRYPTION_ZONES).build()) - .build(); - - String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); - - omMetadataManager.getBucketTable().put(bucketKey, omBucketInfo); - - omMetadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey), - new CacheValue<>(Optional.of(omBucketInfo), 100L)); - - OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName, - keyName); - - OMClientRequest omClientRequest = - new S3InitiateMultipartUploadRequest(modifiedRequest); - - OMClientResponse omClientResponse = - omClientRequest.validateAndUpdateCache(ozoneManager, 1L, - ozoneManagerDoubleBufferHelper); - - Assert.assertNotNull(omClientResponse.getOMResponse()); - Assert.assertEquals(NOT_SUPPORTED_OPERATION, - omClientResponse.getOMResponse().getStatus()); - } }