diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java index 530084e0f38c..c7fc21cbb242 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java @@ -65,17 +65,6 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream // can be reset if a new position is seeked. private int prevPartIndex = 0; - // If a read's start/ length position doesn't coincide with a Crypto buffer - // boundary, it will be adjusted as reads should happen only at the buffer - // boundaries for decryption to happen correctly. In this case, after the - // data has been read and decrypted, only the requested data should be - // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy - // store these adjustment information. Before returning to client, the first - // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy - // number of bytes must be discarded. - private int readPositionAdjustedBy = 0; - private int readLengthAdjustedBy = 0; - public MultipartCryptoKeyInputStream(String keyName, List inputStreams) { @@ -130,65 +119,7 @@ public int read(byte[] b, int off, int len) throws IOException { // Get the current partStream and read data from it OzoneCryptoInputStream current = partStreams.get(partIndex); - // CryptoInputStream reads hadoop.security.crypto.buffer.size number of - // bytes (default 8KB) at a time. This needs to be taken into account - // in calculating the numBytesToRead. - int numBytesToRead = getNumBytesToRead(len, (int)current.getRemaining(), - current.getBufferSize()); - int numBytesRead; - - if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) { - // There was some adjustment made in position and/ or length of data - // to be read to account for Crypto buffer boundary. Hence, read the - // data into a temp buffer and then copy only the requested data into - // clients buffer. - byte[] tempBuffer = new byte[numBytesToRead]; - int actualNumBytesRead = current.read(tempBuffer, 0, - numBytesToRead); - numBytesRead = actualNumBytesRead - readPositionAdjustedBy - - readLengthAdjustedBy; - - if (actualNumBytesRead != numBytesToRead) { - throw new IOException(String.format("Inconsistent read for key=%s " + - "part=%d length=%d numBytesToRead(accounting for Crypto " + - "boundaries)=%d numBytesRead(actual)=%d " + - "numBytesToBeRead(into client buffer discarding crypto " + - "boundary adjustments)=%d", - key, partIndex, current.getLength(), numBytesToRead, - actualNumBytesRead, numBytesRead)); - } - - // TODO: Byte array copies are not optimal. If there is a better and - // more optimal solution to copy only a part of read data into - // client buffer, this should be updated. - System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off, - numBytesRead); - - LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " + - "instead of {} bytes to account for Crypto buffer boundary. " + - "Client buffer will be copied with read data from position {}" + - "upto position {}, discarding the extra bytes read to " + - "maintain Crypto buffer boundary limits", key, partIndex, - actualNumBytesRead, numBytesRead, readPositionAdjustedBy, - actualNumBytesRead - readPositionAdjustedBy); - - if (readLengthAdjustedBy > 0) { - current.seek(current.getPos() - readLengthAdjustedBy); - } - - // Reset readPositionAdjustedBy and readLengthAdjustedBy - readPositionAdjustedBy = 0; - readLengthAdjustedBy = 0; - } else { - numBytesRead = current.read(b, off, numBytesToRead); - if (numBytesRead != numBytesToRead) { - throw new IOException(String.format("Inconsistent read for key=%s " + - "part=%d length=%d numBytesToRead=%d numBytesRead=%d", - key, partIndex, current.getLength(), numBytesToRead, - numBytesRead)); - } - } - + int numBytesRead = current.read(b, off, len); totalReadLen += numBytesRead; off += numBytesRead; len -= numBytesRead; @@ -202,70 +133,6 @@ public int read(byte[] b, int off, int len) throws IOException { return totalReadLen; } - /** - * Get number of bytes to read from the current stream based on the length - * to be read, number of bytes remaining in the stream and the Crypto buffer - * size. - * Reads should be performed at the CryptoInputStream Buffer boundaries only. - * Otherwise, the decryption will be incorrect. - */ - private int getNumBytesToRead(int lenToRead, int remaining, - int cryptoBufferSize) throws IOException { - - Preconditions.checkArgument(readPositionAdjustedBy == 0); - Preconditions.checkArgument(readLengthAdjustedBy == 0); - - // Check and adjust position if required - adjustReadPosition(cryptoBufferSize); - remaining += readPositionAdjustedBy; - lenToRead += readPositionAdjustedBy; - - return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize); - } - - /** - * Reads should be performed at the CryptoInputStream Buffer boundary size. - * Otherwise, the decryption will be incorrect. Hence, if the position is - * not at the boundary limit, we have to adjust the position and might need - * to read more data than requested. The extra data will be filtered out - * before returning to the client. - */ - private void adjustReadPosition(long cryptoBufferSize) throws IOException { - // Position of the buffer in current stream - long currentPosOfStream = partStreams.get(partIndex).getPos(); - int modulus = (int) (currentPosOfStream % cryptoBufferSize); - if (modulus != 0) { - // Adjustment required. - // Update readPositionAdjustedBy and seek to the adjusted position - readPositionAdjustedBy = modulus; - // Seek current partStream to adjusted position. We do not need to - // reset the seeked positions of other streams. - partStreams.get(partIndex) - .seek(currentPosOfStream - readPositionAdjustedBy); - LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " + - "position {} by -{} to account for Crypto buffer boundary", - key, partIndex, currentPosOfStream, readPositionAdjustedBy); - } - } - - /** - * If the length of data requested does not end at a Crypto Buffer - * boundary, the number of bytes to be read must be adjusted accordingly. - * The extra data will be filtered out before returning to the client. - */ - private int adjustNumBytesToRead(int lenToRead, int remaining, - int cryptoBufferSize) { - int numBytesToRead = Math.min(cryptoBufferSize, remaining); - if (lenToRead < numBytesToRead) { - // Adjustment required; Update readLengthAdjustedBy. - readLengthAdjustedBy = numBytesToRead - lenToRead; - LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " + - "by +{} to account for Crypto buffer boundary", - key, partIndex, readLengthAdjustedBy); - } - return numBytesToRead; - } - /** * Seeks the InputStream to the specified position. This involves 2 steps: * 1. Updating the partIndex to the partStream corresponding to the diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java index 9d5d888688f4..1d0cb2bb82a2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java @@ -19,10 +19,14 @@ package org.apache.hadoop.ozone.client.io; import java.io.IOException; + +import com.google.common.base.Preconditions; import org.apache.hadoop.crypto.CryptoCodec; import org.apache.hadoop.crypto.CryptoInputStream; import org.apache.hadoop.crypto.CryptoStreamUtils; import org.apache.hadoop.fs.Seekable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A CryptoInputStream for Ozone with length. This stream is used to read @@ -31,16 +35,35 @@ public class OzoneCryptoInputStream extends CryptoInputStream implements Seekable { + private static final Logger LOG = + LoggerFactory.getLogger(OzoneCryptoInputStream.class); + private final long length; private final int bufferSize; + private final String keyName; + private final int partIndex; + + // If a read's start/ length position doesn't coincide with a Crypto buffer + // boundary, it will be adjusted as reads should happen only at the buffer + // boundaries for decryption to happen correctly. In this case, after the + // data has been read and decrypted, only the requested data should be + // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy + // store these adjustment information. Before returning to client, the first + // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy + // number of bytes must be discarded. + private int readPositionAdjustedBy = 0; + private int readLengthAdjustedBy = 0; public OzoneCryptoInputStream(LengthInputStream in, - CryptoCodec codec, byte[] key, byte[] iv) throws IOException { + CryptoCodec codec, byte[] key, byte[] iv, + String keyName, int partIndex) throws IOException { super(in.getWrappedStream(), codec, key, iv); this.length = in.getLength(); // This is the buffer size used while creating the CryptoInputStream // internally this.bufferSize = CryptoStreamUtils.getBufferSize(codec.getConf()); + this.keyName = keyName; + this.partIndex = partIndex; } public long getLength() { @@ -54,4 +77,131 @@ public int getBufferSize() { public long getRemaining() throws IOException { return length - getPos(); } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + // CryptoInputStream reads hadoop.security.crypto.buffer.size number of + // bytes (default 8KB) at a time. This needs to be taken into account + // in calculating the numBytesToRead. + int numBytesToRead = getNumBytesToRead(len, (int)getRemaining(), + getBufferSize()); + int numBytesRead; + + if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) { + // There was some adjustment made in position and/ or length of data + // to be read to account for Crypto buffer boundary. Hence, read the + // data into a temp buffer and then copy only the requested data into + // clients buffer. + byte[] tempBuffer = new byte[numBytesToRead]; + int actualNumBytesRead = super.read(tempBuffer, 0, + numBytesToRead); + numBytesRead = actualNumBytesRead - readPositionAdjustedBy - + readLengthAdjustedBy; + + if (actualNumBytesRead != numBytesToRead) { + throw new IOException(String.format("Inconsistent read for key=%s " + + "part=%d length=%d numBytesToRead(accounting for Crypto " + + "boundaries)=%d numBytesRead(actual)=%d " + + "numBytesToBeRead(into client buffer discarding crypto " + + "boundary adjustments)=%d", + keyName, partIndex, getLength(), numBytesToRead, + actualNumBytesRead, numBytesRead)); + } + + // TODO: Byte array copies are not optimal. If there is a better and + // more optimal solution to copy only a part of read data into + // client buffer, this should be updated. + System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off, + numBytesRead); + + LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " + + "instead of {} bytes to account for Crypto buffer boundary. " + + "Client buffer will be copied with read data from position {}" + + "upto position {}, discarding the extra bytes read to " + + "maintain Crypto buffer boundary limits", keyName, partIndex, + actualNumBytesRead, numBytesRead, readPositionAdjustedBy, + actualNumBytesRead - readPositionAdjustedBy); + + if (readLengthAdjustedBy > 0) { + seek(getPos() - readLengthAdjustedBy); + } + + // Reset readPositionAdjustedBy and readLengthAdjustedBy + readPositionAdjustedBy = 0; + readLengthAdjustedBy = 0; + } else { + numBytesRead = super.read(b, off, numBytesToRead); + if (numBytesRead != numBytesToRead) { + throw new IOException(String.format("Inconsistent read for key=%s " + + "part=%d length=%d numBytesToRead=%d numBytesRead=%d", + keyName, partIndex, getLength(), numBytesToRead, + numBytesRead)); + } + } + return numBytesRead; + } + + /** + * Get number of bytes to read from the current stream based on the length + * to be read, number of bytes remaining in the stream and the Crypto buffer + * size. + * Reads should be performed at the CryptoInputStream Buffer boundaries only. + * Otherwise, the decryption will be incorrect. + */ + private int getNumBytesToRead(int lenToRead, int remaining, + int cryptoBufferSize) throws IOException { + + Preconditions.checkArgument(readPositionAdjustedBy == 0); + Preconditions.checkArgument(readLengthAdjustedBy == 0); + + // Check and adjust position if required + adjustReadPosition(cryptoBufferSize); + remaining += readPositionAdjustedBy; + lenToRead += readPositionAdjustedBy; + + return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize); + } + + /** + * Reads should be performed at the CryptoInputStream Buffer boundary size. + * Otherwise, the decryption will be incorrect. Hence, if the position is + * not at the boundary limit, we have to adjust the position and might need + * to read more data than requested. The extra data will be filtered out + * before returning to the client. + */ + private void adjustReadPosition(long cryptoBufferSize) throws IOException { + // Position of the buffer in current stream + long currentPosOfStream = getPos(); + int modulus = (int) (currentPosOfStream % cryptoBufferSize); + if (modulus != 0) { + // Adjustment required. + // Update readPositionAdjustedBy and seek to the adjusted position + readPositionAdjustedBy = modulus; + // Seek current partStream to adjusted position. We do not need to + // reset the seeked positions of other streams. + seek(currentPosOfStream - readPositionAdjustedBy); + LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " + + "position {} by -{} to account for Crypto buffer boundary", + keyName, partIndex, currentPosOfStream, readPositionAdjustedBy); + } + } + + /** + * If the length of data requested does not end at a Crypto Buffer + * boundary, the number of bytes to be read must be adjusted accordingly. + * The extra data will be filtered out before returning to the client. + */ + private int adjustNumBytesToRead(int lenToRead, int remaining, + int cryptoBufferSize) { + int numBytesToRead = Math.min(cryptoBufferSize, remaining); + if (lenToRead < numBytesToRead) { + // Adjustment required; Update readLengthAdjustedBy. + readLengthAdjustedBy = numBytesToRead - lenToRead; + LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " + + "by +{} to account for Crypto buffer boundary", + keyName, partIndex, readLengthAdjustedBy); + } + return numBytesToRead; + } + } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 973f98ff3b9a..f3f87c52a8cb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1865,11 +1865,13 @@ private OzoneInputStream createInputStream( final KeyProvider.KeyVersion decrypted = getDEK(feInfo); List cryptoInputStreams = new ArrayList<>(); - for (LengthInputStream lengthInputStream : lengthInputStreams) { + for (int i = 0; i < lengthInputStreams.size(); i++) { + LengthInputStream lengthInputStream = lengthInputStreams.get(i); final OzoneCryptoInputStream ozoneCryptoInputStream = new OzoneCryptoInputStream(lengthInputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo), - decrypted.getMaterial(), feInfo.getIV()); + decrypted.getMaterial(), feInfo.getIV(), + keyInfo.getKeyName(), i); cryptoInputStreams.add(ozoneCryptoInputStream); } return new MultipartCryptoKeyInputStream(keyInfo.getKeyName(),