From 7bf6a2bcac65c1f9974b9c6e592a20ddbb6a3766 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Tue, 15 Mar 2022 13:34:28 -0700 Subject: [PATCH] Add parquet ecryption functionality into presto. Co-authored-by: ggershinsky Summary: This is to port parquet-mr decryption apache/parquet-mr@65b95fb --- .../presto/delta/DeltaPageSourceProvider.java | 30 +- .../parquet/ParquetPageSourceFactory.java | 31 +- .../parquet/BenchmarkParquetPageSource.java | 4 +- .../presto/hudi/HudiParquetPageSources.java | 35 +- .../iceberg/IcebergPageSourceProvider.java | 35 +- .../facebook/presto/parquet/DataPageV1.java | 1 + .../facebook/presto/parquet/DataPageV2.java | 1 + .../presto/parquet/DictionaryPage.java | 1 + .../com/facebook/presto/parquet/Page.java | 4 + .../cache/CachingParquetMetadataSource.java | 9 +- .../presto/parquet/cache/MetadataReader.java | 239 ++++++++++-- .../parquet/cache/ParquetMetadataSource.java | 5 +- .../parquet/predicate/PredicateUtils.java | 13 +- .../presto/parquet/reader/PageReader.java | 48 ++- .../parquet/reader/ParquetColumnChunk.java | 65 +++- .../presto/parquet/reader/ParquetReader.java | 41 +- .../crypto/HiddenColumnChunkMetaData.java | 75 ++++ .../parquet/crypto/HiddenColumnException.java | 29 ++ .../parquet/BenchmarkParquetReader.java | 4 +- .../parquet/reader/EncryptDecryptUtil.java | 97 +++++ .../parquet/reader/EncryptionTestFile.java | 54 +++ .../reader/EncryptionTestFileBuilder.java | 199 ++++++++++ .../parquet/reader/MockInputStreamTail.java | 113 ++++++ .../parquet/reader/MockParquetDataSource.java | 109 ++++++ .../presto/parquet/reader/TestEncryption.java | 351 ++++++++++++++++++ .../reader/TestHiddenColumnChunkMetaData.java | 66 ++++ 26 files changed, 1550 insertions(+), 109 deletions(-) create mode 100644 presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnChunkMetaData.java create mode 100644 presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptDecryptUtil.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFile.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFileBuilder.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockInputStreamTail.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockParquetDataSource.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestEncryption.java create mode 100644 presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestHiddenColumnChunkMetaData.java diff --git a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java index 63b81e7608ed6..9f3a51dc9b968 100644 --- a/presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java +++ b/presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java @@ -33,7 +33,6 @@ import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.cache.MetadataReader; import com.facebook.presto.parquet.predicate.Predicate; -import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils; import com.facebook.presto.parquet.reader.ParquetReader; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; @@ -53,6 +52,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.crypto.DecryptionPropertiesFactory; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -95,8 +97,10 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getSubfieldType; import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName; import static com.facebook.presto.parquet.ParquetTypeUtils.nestedColumnPath; +import static com.facebook.presto.parquet.cache.MetadataReader.findFirstNonHiddenColumnId; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; +import static com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.getColumnIndexStore; import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.nullToEmpty; @@ -219,9 +223,13 @@ private static ConnectorPageSource createParquetPageSource( ParquetDataSource dataSource = null; try { FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(user, path, configuration).open(path); - dataSource = buildHdfsParquetDataSource(inputStream, path, stats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, fileSize).getParquetMetadata(); - + // Lambda expression below requires final variable, so we define a new variable parquetDataSource. + final ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, stats); + dataSource = parquetDataSource; + DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(configuration); + FileDecryptionProperties fileDecryptionProperties = (cryptoFactory == null) ? null : cryptoFactory.getFileDecryptionProperties(configuration, path); + Optional fileDecryptor = (fileDecryptionProperties == null) ? Optional.empty() : Optional.of(new InternalFileDecryptor(fileDecryptionProperties)); + ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(parquetDataSource, fileSize, fileDecryptor).getParquetMetadata()); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); @@ -237,9 +245,12 @@ private static ConnectorPageSource createParquetPageSource( ImmutableList.Builder footerBlocks = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= start && firstDataPage < start + length) { - footerBlocks.add(block); + Optional firstIndex = findFirstNonHiddenColumnId(block); + if (firstIndex.isPresent()) { + long firstDataPage = block.getColumns().get(firstIndex.get()).getFirstDataPageOffset(); + if (firstDataPage >= start && firstDataPage < start + length) { + footerBlocks.add(block); + } } } @@ -250,7 +261,7 @@ private static ConnectorPageSource createParquetPageSource( ImmutableList.Builder blocks = ImmutableList.builder(); List blockIndexStores = new ArrayList<>(); for (BlockMetaData block : footerBlocks.build()) { - Optional columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); + Optional columnIndexStore = getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); if (predicateMatches(parquetPredicate, block, finalDataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) { blocks.add(block); blockIndexStores.add(columnIndexStore.orElse(null)); @@ -268,7 +279,8 @@ private static ConnectorPageSource createParquetPageSource( verificationEnabled, parquetPredicate, blockIndexStores, - columnIndexFilterEnabled); + columnIndexFilterEnabled, + fileDecryptor); ImmutableList.Builder namesBuilder = ImmutableList.builder(); ImmutableList.Builder typesBuilder = ImmutableList.builder(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 04cea290e2ba4..1e6f418c18d7f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -51,6 +51,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.crypto.DecryptionPropertiesFactory; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -114,6 +117,7 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getSubfieldType; import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName; import static com.facebook.presto.parquet.ParquetTypeUtils.nestedColumnPath; +import static com.facebook.presto.parquet.cache.MetadataReader.findFirstNonHiddenColumnId; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; @@ -122,6 +126,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; +import static org.apache.parquet.crypto.DecryptionPropertiesFactory.loadFactory; import static org.apache.parquet.io.ColumnIOConverter.constructField; import static org.apache.parquet.io.ColumnIOConverter.findNestedColumnIO; @@ -238,12 +243,18 @@ public static ConnectorPageSource createParquetPageSource( ParquetDataSource dataSource = null; try { FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(user, path, configuration).openFile(path, hiveFileContext); - dataSource = buildHdfsParquetDataSource(inputStream, path, stats); - ParquetMetadata parquetMetadata = parquetMetadataSource.getParquetMetadata( - dataSource, + // Lambda expression below requires final variable, so we define a new variable parquetDataSource. + final ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, stats); + dataSource = parquetDataSource; + DecryptionPropertiesFactory cryptoFactory = loadFactory(configuration); + FileDecryptionProperties fileDecryptionProperties = (cryptoFactory == null) ? null : cryptoFactory.getFileDecryptionProperties(configuration, path); + Optional fileDecryptor = (fileDecryptionProperties == null) ? Optional.empty() : Optional.of(new InternalFileDecryptor(fileDecryptionProperties)); + ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> parquetMetadataSource.getParquetMetadata( + parquetDataSource, fileSize, hiveFileContext.isCacheable(), - hiveFileContext.getModificationTime()).getParquetMetadata(); + hiveFileContext.getModificationTime(), + fileDecryptor).getParquetMetadata()); if (!columns.isEmpty() && columns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { return new AggregatedParquetPageSource(columns, parquetMetadata, typeManager, functionResolution); @@ -264,9 +275,12 @@ public static ConnectorPageSource createParquetPageSource( ImmutableList.Builder footerBlocks = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= start && firstDataPage < start + length) { - footerBlocks.add(block); + Optional firstIndex = findFirstNonHiddenColumnId(block); + if (firstIndex.isPresent()) { + long firstDataPage = block.getColumns().get(firstIndex.get()).getFirstDataPageOffset(); + if (firstDataPage >= start && firstDataPage < start + length) { + footerBlocks.add(block); + } } } @@ -308,7 +322,8 @@ public static ConnectorPageSource createParquetPageSource( verificationEnabled, parquetPredicate, blockIndexStores, - columnIndexFilterEnabled); + columnIndexFilterEnabled, + fileDecryptor); ImmutableList.Builder namesBuilder = ImmutableList.builder(); ImmutableList.Builder typesBuilder = ImmutableList.builder(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/BenchmarkParquetPageSource.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/BenchmarkParquetPageSource.java index 6d7581a53642a..b8d0baa459369 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/BenchmarkParquetPageSource.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/BenchmarkParquetPageSource.java @@ -267,7 +267,7 @@ public void setup() PageFunctionCompiler pageFunctionCompiler = new PageFunctionCompiler(metadata, 0); pageProcessor = new ExpressionCompiler(metadata, pageFunctionCompiler).compilePageProcessor(testSession.getSqlFunctionProperties(), filterConjunction(), projections).get(); - parquetMetadata = MetadataReader.readFooter(new FileParquetDataSource(parquetFile), parquetFile.length()).getParquetMetadata(); + parquetMetadata = MetadataReader.readFooter(new FileParquetDataSource(parquetFile), parquetFile.length(), Optional.empty()).getParquetMetadata(); } @TearDown @@ -290,7 +290,7 @@ ParquetPageSource createParquetPageSource() fields.add(ColumnIOConverter.constructField(getTypeFromTypeSignature(), messageColumnIO.getChild(i))); } - ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false); + ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false, Optional.empty()); return new ParquetPageSource(parquetReader, Collections.nCopies(channelCount, type), fields, columnNames, new RuntimeStats()); } diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiParquetPageSources.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiParquetPageSources.java index 1d7f4dfcc745a..5e5ffba650f1c 100644 --- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiParquetPageSources.java +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiParquetPageSources.java @@ -30,7 +30,6 @@ import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.cache.MetadataReader; import com.facebook.presto.parquet.predicate.Predicate; -import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils; import com.facebook.presto.parquet.reader.ParquetReader; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.PrestoException; @@ -42,6 +41,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.crypto.DecryptionPropertiesFactory; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -65,8 +67,10 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors; import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName; +import static com.facebook.presto.parquet.cache.MetadataReader.findFirstNonHiddenColumnId; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; +import static com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.getColumnIndexStore; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -110,8 +114,15 @@ public static ConnectorPageSource createParquetPageSource( modificationTime, false); FSDataInputStream inputStream = filesystem.openFile(path, hiveFileContext); - dataSource = buildHdfsParquetDataSource(inputStream, path, fileFormatDataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, fileSize).getParquetMetadata(); + // Lambda expression below requires final variable, so we define a new variable parquetDataSource. + final ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, fileFormatDataSourceStats); + dataSource = parquetDataSource; + DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(configuration); + FileDecryptionProperties fileDecryptionProperties = (cryptoFactory == null) ? + null : cryptoFactory.getFileDecryptionProperties(configuration, path); + Optional fileDecryptor = (fileDecryptionProperties == null) ? + Optional.empty() : Optional.of(new InternalFileDecryptor(fileDecryptionProperties)); + ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(parquetDataSource, fileSize, fileDecryptor).getParquetMetadata()); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); List parquetFields = regularColumns.stream() @@ -127,12 +138,15 @@ public static ConnectorPageSource createParquetPageSource( List blocks = new ArrayList<>(); List blockIndexStores = new ArrayList<>(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); - if ((firstDataPage >= start) && (firstDataPage < (start + length)) && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) { - blocks.add(block); - blockIndexStores.add(columnIndexStore.orElse(null)); + Optional firstIndex = findFirstNonHiddenColumnId(block); + if (firstIndex.isPresent()) { + long firstDataPage = block.getColumns().get(firstIndex.get()).getFirstDataPageOffset(); + Optional columnIndexStore = getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); + if ((firstDataPage >= start) && (firstDataPage < (start + length)) && + predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) { + blocks.add(block); + blockIndexStores.add(columnIndexStore.orElse(null)); + } } } @@ -148,7 +162,8 @@ public static ConnectorPageSource createParquetPageSource( verificationEnabled, parquetPredicate, blockIndexStores, - columnIndexFilterEnabled); + columnIndexFilterEnabled, + fileDecryptor); ImmutableList.Builder namesBuilder = ImmutableList.builder(); ImmutableList.Builder prestoTypes = ImmutableList.builder(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index 523e6ef085a98..f85de4228e292 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -54,7 +54,6 @@ import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.cache.MetadataReader; import com.facebook.presto.parquet.predicate.Predicate; -import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils; import com.facebook.presto.parquet.reader.ParquetReader; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; @@ -76,6 +75,9 @@ import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.iceberg.FileFormat; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.crypto.DecryptionPropertiesFactory; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -122,8 +124,10 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors; import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName; +import static com.facebook.presto.parquet.cache.MetadataReader.findFirstNonHiddenColumnId; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; +import static com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.getColumnIndexStore; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -306,8 +310,15 @@ private static ConnectorPageSource createParquetPageSource( modificationTime, false); FSDataInputStream inputStream = fileSystem.openFile(path, hiveFileContext); - dataSource = buildHdfsParquetDataSource(inputStream, path, fileFormatDataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, fileSize).getParquetMetadata(); + // Lambda expression below requires final variable, so we define a new variable parquetDataSource. + final ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, fileFormatDataSourceStats); + dataSource = parquetDataSource; + DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(configuration); + FileDecryptionProperties fileDecryptionProperties = (cryptoFactory == null) ? + null : cryptoFactory.getFileDecryptionProperties(configuration, path); + Optional fileDecryptor = (fileDecryptionProperties == null) ? + Optional.empty() : Optional.of(new InternalFileDecryptor(fileDecryptionProperties)); + ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(parquetDataSource, fileSize, fileDecryptor).getParquetMetadata()); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); @@ -335,12 +346,15 @@ private static ConnectorPageSource createParquetPageSource( List blocks = new ArrayList<>(); List blockIndexStores = new ArrayList<>(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional columnIndexStore = ColumnIndexFilterUtils.getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); - if ((firstDataPage >= start) && (firstDataPage < (start + length)) && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) { - blocks.add(block); - blockIndexStores.add(columnIndexStore.orElse(null)); + Optional firstIndex = findFirstNonHiddenColumnId(block); + if (firstIndex.isPresent()) { + long firstDataPage = block.getColumns().get(firstIndex.get()).getFirstDataPageOffset(); + Optional columnIndexStore = getColumnIndexStore(parquetPredicate, finalDataSource, block, descriptorsByPath, columnIndexFilterEnabled); + if ((firstDataPage >= start) && (firstDataPage < (start + length)) && + predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndexStore, columnIndexFilterEnabled)) { + blocks.add(block); + blockIndexStores.add(columnIndexStore.orElse(null)); + } } } @@ -356,7 +370,8 @@ private static ConnectorPageSource createParquetPageSource( verificationEnabled, parquetPredicate, blockIndexStores, - columnIndexFilterEnabled); + columnIndexFilterEnabled, + fileDecryptor); ImmutableList.Builder namesBuilder = ImmutableList.builder(); ImmutableList.Builder prestoTypes = ImmutableList.builder(); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV1.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV1.java index 0f172651e98f8..35f902208ef7d 100755 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV1.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV1.java @@ -46,6 +46,7 @@ public DataPageV1( this.valuesEncoding = valuesEncoding; } + @Override public Slice getSlice() { return slice; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV2.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV2.java index 569d715cacd93..b62a0abf4bb37 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV2.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/DataPageV2.java @@ -80,6 +80,7 @@ public ParquetEncoding getDataEncoding() return dataEncoding; } + @Override public Slice getSlice() { return slice; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/DictionaryPage.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/DictionaryPage.java index da5581a108b62..37c22695d57a9 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/DictionaryPage.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/DictionaryPage.java @@ -42,6 +42,7 @@ public DictionaryPage(Slice slice, int uncompressedSize, int dictionarySize, Par this.encoding = requireNonNull(encoding, "encoding is null"); } + @Override public Slice getSlice() { return slice; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/Page.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/Page.java index b5b917af2aa57..8b043c95b1db1 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/Page.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/Page.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.parquet; +import io.airlift.slice.Slice; + public abstract class Page { protected final int compressedSize; @@ -33,4 +35,6 @@ public int getUncompressedSize() { return uncompressedSize; } + + public abstract Slice getSlice(); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/CachingParquetMetadataSource.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/CachingParquetMetadataSource.java index f1c829114e397..0378cb002efbb 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/CachingParquetMetadataSource.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/CachingParquetMetadataSource.java @@ -18,8 +18,10 @@ import com.facebook.presto.parquet.ParquetDataSourceId; import com.google.common.cache.Cache; import com.google.common.util.concurrent.UncheckedExecutionException; +import org.apache.parquet.crypto.InternalFileDecryptor; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ExecutionException; import static com.google.common.base.Throwables.throwIfInstanceOf; @@ -42,14 +44,15 @@ public ParquetFileMetadata getParquetMetadata( ParquetDataSource parquetDataSource, long fileSize, boolean cacheable, - long modificationTime) + long modificationTime, + Optional fileDecryptor) throws IOException { try { if (cacheable) { ParquetFileMetadata fileMetadataCache = cache.get( parquetDataSource.getId(), - () -> delegate.getParquetMetadata(parquetDataSource, fileSize, cacheable, modificationTime)); + () -> delegate.getParquetMetadata(parquetDataSource, fileSize, cacheable, modificationTime, fileDecryptor)); if (fileMetadataCache.getModificationTime() == modificationTime) { return fileMetadataCache; } @@ -57,7 +60,7 @@ public ParquetFileMetadata getParquetMetadata( cache.invalidate(parquetDataSource.getId()); } } - return delegate.getParquetMetadata(parquetDataSource, fileSize, cacheable, modificationTime); + return delegate.getParquetMetadata(parquetDataSource, fileSize, cacheable, modificationTime, fileDecryptor); } catch (ExecutionException | UncheckedExecutionException e) { throwIfInstanceOf(e.getCause(), IOException.class); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java index 463f4ce984889..af20fb40afad6 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/MetadataReader.java @@ -15,17 +15,33 @@ import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; +import com.facebook.presto.parquet.ParquetDataSourceId; +import io.airlift.slice.BasicSliceInput; import io.airlift.slice.Slice; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.AesGcmEncryptor; +import org.apache.parquet.crypto.HiddenColumnChunkMetaData; +import org.apache.parquet.crypto.InternalColumnDecryptionSetup; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.crypto.KeyAccessDeniedException; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.crypto.TagVerificationException; +import org.apache.parquet.format.BlockCipher.Decryptor; import org.apache.parquet.format.ColumnChunk; +import org.apache.parquet.format.ColumnCryptoMetaData; import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.ConvertedType; import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.EncryptionWithColumnKey; +import org.apache.parquet.format.FileCryptoMetaData; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.KeyValue; import org.apache.parquet.format.RowGroup; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.Statistics; import org.apache.parquet.format.Type; +import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -40,6 +56,7 @@ import org.apache.parquet.schema.Type.Repetition; import org.apache.parquet.schema.Types; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -50,42 +67,43 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import static com.facebook.presto.parquet.ParquetValidationUtils.validateParquet; +import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.Slices.wrappedBuffer; import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.US_ASCII; +import static org.apache.parquet.crypto.AesCipher.GCM_TAG_LENGTH; +import static org.apache.parquet.crypto.AesCipher.NONCE_LENGTH; +import static org.apache.parquet.format.Util.readFileCryptoMetaData; import static org.apache.parquet.format.Util.readFileMetaData; +import static org.apache.parquet.hadoop.ParquetFileWriter.EF_MAGIC_STR; +import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC_STR; public final class MetadataReader implements ParquetMetadataSource { - private static final Slice MAGIC = wrappedBuffer("PAR1".getBytes(US_ASCII)); + private static final Slice MAGIC = wrappedBuffer(MAGIC_STR.getBytes(US_ASCII)); + private static final Slice EMAGIC = wrappedBuffer(EF_MAGIC_STR.getBytes(US_ASCII)); private static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length(); private static final int EXPECTED_FOOTER_SIZE = 16 * 1024; private static final ParquetMetadataConverter PARQUET_METADATA_CONVERTER = new ParquetMetadataConverter(); private static final long MODIFICATION_TIME_NOT_SET = 0L; - public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource, long fileSize) + public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource, long fileSize, Optional fileDecryptor) throws IOException { - return readFooter(parquetDataSource, fileSize, MODIFICATION_TIME_NOT_SET); + return readFooter(parquetDataSource, fileSize, MODIFICATION_TIME_NOT_SET, fileDecryptor); } - public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource, long fileSize, long modificationTime) + public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource, long fileSize, long modificationTime, Optional fileDecryptor) throws IOException { - // Parquet File Layout: - // - // MAGIC - // variable: Data - // variable: Metadata - // 4 bytes: MetadataLength - // MAGIC - + // Parquet File Layout: https://github.com/apache/parquet-format/blob/master/Encryption.md validateParquet(fileSize >= MAGIC.length() + POST_SCRIPT_SIZE, "%s is not a valid Parquet File", parquetDataSource.getId()); // EXPECTED_FOOTER_SIZE is an int, so this will never fail @@ -94,9 +112,10 @@ public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource Slice tailSlice = wrappedBuffer(buffer); Slice magic = tailSlice.slice(tailSlice.length() - MAGIC.length(), MAGIC.length()); - if (!MAGIC.equals(magic)) { - throw new ParquetCorruptionException(format("Not valid Parquet file: %s expected magic number: %s got: %s", parquetDataSource.getId(), Arrays.toString(MAGIC.getBytes()), Arrays.toString(magic.getBytes()))); + if (!MAGIC.equals(magic) && !EMAGIC.equals(magic)) { + throw new ParquetCorruptionException(format("Not valid Parquet file: %s expected magic number: %s or %s, but got: %s", parquetDataSource.getId(), Arrays.toString(MAGIC.getBytes()), Arrays.toString(EMAGIC.getBytes()), Arrays.toString(magic.getBytes()))); } + boolean encryptedFooterMode = EMAGIC.equals(magic); int metadataLength = tailSlice.getInt(tailSlice.length() - POST_SCRIPT_SIZE); int completeFooterSize = metadataLength + POST_SCRIPT_SIZE; @@ -112,9 +131,52 @@ public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource tailSlice = wrappedBuffer(footerBuffer, 0, footerBuffer.length); } - FileMetaData fileMetaData = readFileMetaData(tailSlice.slice(tailSlice.length() - completeFooterSize, metadataLength).getInput()); + return readParquetMetadata(tailSlice.slice(tailSlice.length() - completeFooterSize, metadataLength).getInput(), metadataLength, modificationTime, fileDecryptor, encryptedFooterMode, parquetDataSource.getId()); + } + + private static ParquetFileMetadata readParquetMetadata(BasicSliceInput input, int metadataLength, long modificationTime, Optional fileDecryptor, boolean encryptedFooterMode, ParquetDataSourceId id) + throws IOException + { + checkArgument(!encryptedFooterMode || fileDecryptor.isPresent(), "fileDecryptionProperties cannot be null when encryptedFooterMode is true"); + Decryptor footerDecryptor = null; + // additional authenticated data for AES cipher + byte[] additionalAuthenticationData = null; + + if (encryptedFooterMode) { + FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(input); + fileDecryptor.get().setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(), true, fileCryptoMetaData.getKey_metadata()); + footerDecryptor = fileDecryptor.get().fetchFooterDecryptor(); + additionalAuthenticationData = AesCipher.createFooterAAD(fileDecryptor.get().getFileAAD()); + } + + FileMetaData fileMetaData = readFileMetaData(input, footerDecryptor, additionalAuthenticationData); + return convertToParquetMetadata(input, fileMetaData, metadataLength, modificationTime, fileDecryptor, encryptedFooterMode, id); + } + + private static ParquetFileMetadata convertToParquetMetadata(BasicSliceInput input, FileMetaData fileMetaData, int metadataLength, long modificationTime, Optional fileDecryptor, boolean encryptedFooter, ParquetDataSourceId id) + throws IOException + { List schema = fileMetaData.getSchema(); - validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", parquetDataSource.getId()); + validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", id); + + // Reader attached fileDecryptor. The file could be encrypted with plaintext footer or the whole file is plaintext. + if (!encryptedFooter && fileDecryptor.isPresent()) { + if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file + fileDecryptor.get().setPlaintextFile(); + // Detect that the file is not encrypted by mistake + if (!fileDecryptor.get().plaintextFilesAllowed()) { + throw new ParquetCryptoRuntimeException("Applying decryptor on plaintext file"); + } + } + else { // Encrypted file with plaintext footer + // if no fileDecryptor, can still read plaintext columns + fileDecryptor.get().setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), false, + fileMetaData.getFooter_signing_key_metadata()); + if (fileDecryptor.get().checkFooterIntegrity()) { + verifyFooterIntegrity(input, fileDecryptor.get(), metadataLength); + } + } + } MessageType messageType = readParquetSchema(schema); List blocks = new ArrayList<>(); @@ -127,31 +189,55 @@ public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource List columns = rowGroup.getColumns(); validateParquet(!columns.isEmpty(), "No columns in row group: %s", rowGroup); String filePath = columns.get(0).getFile_path(); + int columnOrdinal = -1; for (ColumnChunk columnChunk : columns) { + columnOrdinal++; validateParquet( (filePath == null && columnChunk.getFile_path() == null) || (filePath != null && filePath.equals(columnChunk.getFile_path())), "all column chunks of the same row group must be in the same file"); + ColumnMetaData metaData = columnChunk.meta_data; - String[] path = metaData.path_in_schema.stream() - .map(value -> value.toLowerCase(Locale.ENGLISH)) - .toArray(String[]::new); - ColumnPath columnPath = ColumnPath.get(path); - PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); - PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); - - ColumnChunkMetaData column = ColumnChunkMetaData.get( - columnPath, - primitiveType, - CompressionCodecName.fromParquet(metaData.codec), - PARQUET_METADATA_CONVERTER.convertEncodingStats(metaData.encoding_stats), - readEncodings(metaData.encodings), - readStats(metaData.statistics, primitiveTypeName), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); + ColumnCryptoMetaData cryptoMetaData = columnChunk.getCrypto_metadata(); + ColumnPath columnPath = null; + boolean encryptedMetadata = false; + + if (null == cryptoMetaData) { // Plaintext column + columnPath = getPath(metaData); + if (fileDecryptor.isPresent() && !fileDecryptor.get().plaintextFile()) { + // mark this column as plaintext in encrypted file decryptor + fileDecryptor.get().setColumnCryptoMetadata(columnPath, false, false, (byte[]) null, columnOrdinal); + } + } + else { // Encrypted column + if (cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY()) { // Column encrypted with footer key + if (!encryptedFooter) { + throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + } + if (null == metaData) { + throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); + } + if (!fileDecryptor.isPresent()) { + throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); + } + columnPath = getPath(metaData); + fileDecryptor.get().setColumnCryptoMetadata(columnPath, true, true, (byte[]) null, columnOrdinal); + } + else { // Column encrypted with column key + try { + // TODO: We decrypted data before filter projection. This could send unnecessary traffic to KMS. This so far not seen a problem in production. + // In parquet-mr, it uses lazy decryption but that required to change ColumnChunkMetadata. We will improve it later. + metaData = decryptMetadata(rowGroup, cryptoMetaData, columnChunk, fileDecryptor.get(), columnOrdinal); + columnPath = getPath(metaData); + } + catch (KeyAccessDeniedException e) { + ColumnChunkMetaData column = new HiddenColumnChunkMetaData(columnPath, filePath); + blockMetaData.addColumn(column); + continue; + } + } + } + ColumnChunkMetaData column = buildColumnChunkMetaData(metaData, columnPath, messageType.getType(columnPath.toArray()).asPrimitiveType()); column.setColumnIndexReference(toColumnIndexReference(columnChunk)); column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); blockMetaData.addColumn(column); @@ -172,6 +258,72 @@ public static ParquetFileMetadata readFooter(ParquetDataSource parquetDataSource return new ParquetFileMetadata(parquetMetadata, toIntExact(metadataLength), modificationTime); } + private static ColumnMetaData decryptMetadata(RowGroup rowGroup, ColumnCryptoMetaData cryptoMetaData, ColumnChunk columnChunk, InternalFileDecryptor fileDecryptor, int columnOrdinal) + { + EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY(); + List pathList = columnKeyStruct.getPath_in_schema(); + byte[] columnKeyMetadata = columnKeyStruct.getKey_metadata(); + ColumnPath columnPath = ColumnPath.get(pathList.toArray(new String[pathList.size()])); + byte[] encryptedMetadataBuffer = columnChunk.getEncrypted_column_metadata(); + + // Decrypt the ColumnMetaData + InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(columnPath, true, false, columnKeyMetadata, columnOrdinal); + ByteArrayInputStream tempInputStream = new ByteArrayInputStream(encryptedMetadataBuffer); + byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData, rowGroup.ordinal, columnOrdinal, -1); + try { + return Util.readColumnMetaData(tempInputStream, columnDecryptionSetup.getMetaDataDecryptor(), columnMetaDataAAD); + } + catch (IOException e) { + throw new ParquetCryptoRuntimeException(columnPath + ". Failed to decrypt column metadata", e); + } + } + + public static ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type) + { + return ColumnChunkMetaData.get( + columnPath, + type, + CompressionCodecName.fromParquet(metaData.codec), + PARQUET_METADATA_CONVERTER.convertEncodingStats(metaData.encoding_stats), + readEncodings(metaData.encodings), + readStats(metaData.statistics, type.getPrimitiveTypeName()), + metaData.data_page_offset, + metaData.dictionary_page_offset, + metaData.num_values, + metaData.total_compressed_size, + metaData.total_uncompressed_size); + } + + private static ColumnPath getPath(ColumnMetaData metaData) + { + String[] path = metaData.path_in_schema.stream() + .map(value -> value.toLowerCase(Locale.ENGLISH)) + .toArray(String[]::new); + return ColumnPath.get(path); + } + + private static void verifyFooterIntegrity(BasicSliceInput from, InternalFileDecryptor fileDecryptor, int combinedFooterLength) + { + byte[] nonce = new byte[NONCE_LENGTH]; + from.read(nonce); + byte[] gcmTag = new byte[GCM_TAG_LENGTH]; + from.read(gcmTag); + + AesGcmEncryptor footerSigner = fileDecryptor.createSignedFooterEncryptor(); + int footerSignatureLength = NONCE_LENGTH + GCM_TAG_LENGTH; + byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength]; + from.setPosition(0); + from.read(serializedFooter, 0, serializedFooter.length); + + byte[] signedFooterAuthenticationData = AesCipher.createFooterAAD(fileDecryptor.getFileAAD()); + byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAuthenticationData); + byte[] calculatedTag = new byte[GCM_TAG_LENGTH]; + System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - GCM_TAG_LENGTH, calculatedTag, 0, GCM_TAG_LENGTH); + if (!Arrays.equals(gcmTag, calculatedTag)) { + throw new TagVerificationException("Signature mismatch in plaintext footer"); + } + } + private static MessageType readParquetSchema(List schema) { Iterator schemaIterator = schema.iterator(); @@ -317,10 +469,11 @@ public ParquetFileMetadata getParquetMetadata( ParquetDataSource parquetDataSource, long fileSize, boolean cacheable, - long modificationTime) + long modificationTime, + Optional fileDecryptor) throws IOException { - return readFooter(parquetDataSource, fileSize, modificationTime); + return readFooter(parquetDataSource, fileSize, modificationTime, fileDecryptor); } private static IndexReference toColumnIndexReference(ColumnChunk columnChunk) @@ -338,4 +491,16 @@ private static IndexReference toOffsetIndexReference(ColumnChunk columnChunk) } return null; } + + public static Optional findFirstNonHiddenColumnId(BlockMetaData block) + { + List columns = block.getColumns(); + for (int i = 0; i < columns.size(); i++) { + if (!HiddenColumnChunkMetaData.isHiddenColumn(columns.get(i))) { + return Optional.of(i); + } + } + // all columns are hidden (encrypted but not accessible to current user) + return Optional.empty(); + } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/ParquetMetadataSource.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/ParquetMetadataSource.java index 6b83b80da88c0..85d10dd3a6c9a 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/ParquetMetadataSource.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/cache/ParquetMetadataSource.java @@ -14,8 +14,10 @@ package com.facebook.presto.parquet.cache; import com.facebook.presto.parquet.ParquetDataSource; +import org.apache.parquet.crypto.InternalFileDecryptor; import java.io.IOException; +import java.util.Optional; public interface ParquetMetadataSource { @@ -23,6 +25,7 @@ ParquetFileMetadata getParquetMetadata( ParquetDataSource parquetDataSource, long fileSize, boolean cacheable, - long modificationTime) + long modificationTime, + Optional fileDecryptor) throws IOException; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java index c199db2521b19..8e1d9bce1a270 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/predicate/PredicateUtils.java @@ -30,6 +30,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.crypto.HiddenColumnChunkMetaData; import org.apache.parquet.format.DictionaryPageHeader; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; @@ -105,10 +106,10 @@ private static Map> getStatistics(BlockMetaData { ImmutableMap.Builder> statistics = ImmutableMap.builder(); for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) { - Statistics columnStatistics = columnMetaData.getStatistics(); - if (columnStatistics != null) { + if (!HiddenColumnChunkMetaData.isHiddenColumn(columnMetaData)) { + Statistics columnStatistics = columnMetaData.getStatistics(); RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray())); - if (descriptor != null) { + if (columnStatistics != null && descriptor != null) { statistics.put(descriptor, columnStatistics); } } @@ -119,9 +120,9 @@ private static Map> getStatistics(BlockMetaData private static boolean dictionaryPredicatesMatch(Predicate parquetPredicate, BlockMetaData blockMetadata, ParquetDataSource dataSource, Map, RichColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain) { for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) { - RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray())); - if (descriptor != null) { - if (isOnlyDictionaryEncodingPages(columnMetaData) && isColumnPredicate(descriptor, parquetTupleDomain)) { + if (!HiddenColumnChunkMetaData.isHiddenColumn(columnMetaData)) { + RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray())); + if (descriptor != null && isOnlyDictionaryEncodingPages(columnMetaData) && isColumnPredicate(descriptor, parquetTupleDomain)) { byte[] buffer = new byte[toIntExact(columnMetaData.getTotalSize())]; dataSource.readFully(columnMetaData.getStartingPos(), buffer); // Early abort, predicate already filters block so no more dictionaries need be read diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/PageReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/PageReader.java index c0a0b2e51ede2..d7fe4e9580918 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/PageReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/PageReader.java @@ -18,23 +18,32 @@ import com.facebook.presto.parquet.DataPageV2; import com.facebook.presto.parquet.DictionaryPage; import io.airlift.slice.Slice; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory; +import org.apache.parquet.format.BlockCipher; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import java.io.IOException; import java.util.LinkedList; +import java.util.Optional; import static com.facebook.presto.parquet.ParquetCompressionUtils.decompress; +import static io.airlift.slice.Slices.wrappedBuffer; import static java.lang.Math.toIntExact; public class PageReader { - private final CompressionCodecName codec; private final long valueCount; private final LinkedList compressedPages; private final DictionaryPage compressedDictionaryPage; private final OffsetIndex offsetIndex; + private final Optional blockDecryptor; private int pageIndex; + private byte[] dataPageAdditionalAuthenticationData; + private byte[] dictionaryPageAdditionalAuthenticationData; + + protected final CompressionCodecName codec; /** * @param compressedPages This parameter will be mutated destructively as {@link DataPage} entries are removed as part of {@link #readPage()}. The caller @@ -45,13 +54,17 @@ public PageReader(CompressionCodecName codec, DictionaryPage compressedDictionaryPage) throws IOException { - this(codec, compressedPages, compressedDictionaryPage, null); + this(codec, compressedPages, compressedDictionaryPage, null, Optional.empty(), null, -1, -1); } public PageReader(CompressionCodecName codec, LinkedList compressedPages, DictionaryPage compressedDictionaryPage, - OffsetIndex offsetIndex) + OffsetIndex offsetIndex, + Optional blockDecryptor, + byte[] fileAdditionalAuthenticationData, + int rowGroupOrdinal, + int columnOrdinal) { this.codec = codec; this.compressedPages = compressedPages; @@ -63,6 +76,11 @@ public PageReader(CompressionCodecName codec, this.valueCount = count; this.offsetIndex = offsetIndex; this.pageIndex = 0; + this.blockDecryptor = blockDecryptor; + if (blockDecryptor.isPresent()) { + dataPageAdditionalAuthenticationData = AesCipher.createModuleAAD(fileAdditionalAuthenticationData, ModuleCipherFactory.ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0); + dictionaryPageAdditionalAuthenticationData = AesCipher.createModuleAAD(fileAdditionalAuthenticationData, ModuleCipherFactory.ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1); + } } public long getTotalValueCount() @@ -75,13 +93,17 @@ public DataPage readPage() if (compressedPages.isEmpty()) { return null; } - DataPage compressedPage = compressedPages.removeFirst(); + if (blockDecryptor.isPresent()) { + AesCipher.quickUpdatePageAAD(dataPageAdditionalAuthenticationData, pageIndex); + } + DataPage compressedPage = compressedPages.remove(0); try { + Slice slice = decryptSliceIfNeeded(compressedPage.getSlice(), dataPageAdditionalAuthenticationData); long firstRowIndex = getFirstRowIndex(pageIndex, offsetIndex); pageIndex = pageIndex + 1; if (compressedPage instanceof DataPageV1) { DataPageV1 dataPageV1 = (DataPageV1) compressedPage; - Slice slice = decompress(codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()); + slice = decompress(codec, slice, dataPageV1.getUncompressedSize()); return new DataPageV1( slice, dataPageV1.getValueCount(), @@ -100,7 +122,7 @@ public DataPage readPage() int uncompressedSize = toIntExact(dataPageV2.getUncompressedSize() - dataPageV2.getDefinitionLevels().length() - dataPageV2.getRepetitionLevels().length()); - Slice slice = decompress(codec, dataPageV2.getSlice(), uncompressedSize); + slice = decompress(codec, slice, uncompressedSize); return new DataPageV2( dataPageV2.getRowCount(), dataPageV2.getNullCount(), @@ -126,8 +148,9 @@ public DictionaryPage readDictionaryPage() return null; } try { + Slice slice = decryptSliceIfNeeded(compressedDictionaryPage.getSlice(), dictionaryPageAdditionalAuthenticationData); return new DictionaryPage( - decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), + decompress(codec, slice, compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), compressedDictionaryPage.getEncoding()); } @@ -140,4 +163,15 @@ public static long getFirstRowIndex(int pageIndex, OffsetIndex offsetIndex) { return offsetIndex == null ? -1 : offsetIndex.getFirstRowIndex(pageIndex); } + + // additional authenticated data for AES cipher + private Slice decryptSliceIfNeeded(Slice slice, byte[] additionalAuthenticationData) + throws IOException + { + if (!blockDecryptor.isPresent()) { + return slice; + } + byte[] plainText = blockDecryptor.get().decrypt(slice.getBytes(), additionalAuthenticationData); + return wrappedBuffer(plainText); + } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetColumnChunk.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetColumnChunk.java index 3c6b7079c536b..ff9a57ede04e3 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetColumnChunk.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetColumnChunk.java @@ -22,17 +22,27 @@ import io.airlift.slice.Slice; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnDecryptionSetup; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; +import java.util.Optional; +import java.util.Set; import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetEncoding; import static io.airlift.slice.Slices.wrappedBuffer; @@ -68,21 +78,43 @@ public ColumnChunkDescriptor getDescriptor() return descriptor; } - protected PageHeader readPageHeader() + protected PageHeader readPageHeader(BlockCipher.Decryptor headerBlockDecryptor, byte[] pageHeaderAadditionalAuthenticationData) throws IOException { - return Util.readPageHeader(stream); + return Util.readPageHeader(stream, headerBlockDecryptor, pageHeaderAadditionalAuthenticationData); } - public PageReader readAllPages() + public PageReader readAllPages(Optional fileDecryptor, int rowGroupOrdinal, int columnOrdinal) throws IOException { LinkedList pages = new LinkedList<>(); DictionaryPage dictionaryPage = null; long valueCount = 0; int dataPageCount = 0; + int pageOrdinal = 0; + byte[] dataPageHeaderAdditionalAuthenticationData = null; + BlockCipher.Decryptor headerBlockDecryptor = null; + InternalColumnDecryptionSetup columnDecryptionSetup = null; + if (fileDecryptor.isPresent()) { + ColumnPath columnPath = ColumnPath.get(descriptor.getColumnDescriptor().getPath()); + columnDecryptionSetup = fileDecryptor.get().getColumnSetup(columnPath); + headerBlockDecryptor = columnDecryptionSetup.getMetaDataDecryptor(); + if (headerBlockDecryptor != null) { + dataPageHeaderAdditionalAuthenticationData = AesCipher.createModuleAAD(fileDecryptor.get().getFileAAD(), ModuleType.DataPageHeader, rowGroupOrdinal, columnOrdinal, pageOrdinal); + } + } while (hasMorePages(valueCount, dataPageCount)) { - PageHeader pageHeader = readPageHeader(); + byte[] pageHeaderAadditionalAuthenticationData = dataPageHeaderAdditionalAuthenticationData; + if (headerBlockDecryptor != null) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (dictionaryPage == null && hasDictionaryPage(descriptor.getColumnChunkMetaData())) { + pageHeaderAadditionalAuthenticationData = AesCipher.createModuleAAD(fileDecryptor.get().getFileAAD(), ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } + else { + AesCipher.quickUpdatePageAAD(dataPageHeaderAdditionalAuthenticationData, pageOrdinal); + } + } + PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAadditionalAuthenticationData); int uncompressedPageSize = pageHeader.getUncompressed_page_size(); int compressedPageSize = pageHeader.getCompressed_page_size(); long firstRowIndex = -1; @@ -97,18 +129,30 @@ public PageReader readAllPages() firstRowIndex = PageReader.getFirstRowIndex(dataPageCount, offsetIndex); valueCount += readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, firstRowIndex, pages); dataPageCount = dataPageCount + 1; + pageOrdinal = pageOrdinal + 1; break; case DATA_PAGE_V2: firstRowIndex = PageReader.getFirstRowIndex(dataPageCount, offsetIndex); valueCount += readDataPageV2(pageHeader, uncompressedPageSize, compressedPageSize, firstRowIndex, pages); dataPageCount = dataPageCount + 1; + pageOrdinal = pageOrdinal + 1; break; default: stream.skipFully(compressedPageSize); break; } } - return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, offsetIndex); + byte[] fileAdditionalAuthenticationData = (fileDecryptor.isPresent()) ? fileDecryptor.get().getFileAAD() : null; + Optional dataDecryptor = getDataDecryptor(columnDecryptionSetup); + return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, offsetIndex, dataDecryptor, fileAdditionalAuthenticationData, rowGroupOrdinal, columnOrdinal); + } + + private Optional getDataDecryptor(InternalColumnDecryptionSetup columnDecryptionSetup) + { + if (columnDecryptionSetup == null || columnDecryptionSetup.getDataDecryptor() == null) { + return Optional.empty(); + } + return Optional.of(columnDecryptionSetup.getDataDecryptor()); } private Slice getSlice(int size) throws IOException @@ -186,4 +230,15 @@ private boolean hasMorePages(long valuesCount, int pagesCount) return offsetIndex == null ? valuesCount < descriptor.getColumnChunkMetaData().getValueCount() : pagesCount < offsetIndex.getPageCount(); } + + private boolean hasDictionaryPage(ColumnChunkMetaData columnChunkMetaData) + { + EncodingStats stats = columnChunkMetaData.getEncodingStats(); + if (stats != null) { + return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages(); + } + + Set encodings = columnChunkMetaData.getEncodings(); + return encodings.contains(Encoding.PLAIN_DICTIONARY) || encodings.contains(Encoding.RLE_DICTIONARY); + } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java index 84c06a659e585..c7aae919aada6 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java @@ -43,6 +43,8 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.crypto.HiddenColumnChunkMetaData; +import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -89,18 +91,16 @@ public class ParquetReader private static final int MAX_VECTOR_LENGTH = 1024; private static final int INITIAL_BATCH_SIZE = 1; private static final int BATCH_SIZE_GROWTH_FACTOR = 2; - + private final Optional fileDecryptor; private final List blocks; private final Optional> firstRowsOfBlocks; private final List columns; - private final ParquetDataSource dataSource; private final AggregatedMemoryContext systemMemoryContext; private final boolean batchReadEnabled; private final boolean enableVerification; private final FilterPredicate filter; private int currentBlock; - private BlockMetaData currentBlockMetadata; private long currentPosition; private long currentGroupRowCount; @@ -126,10 +126,13 @@ public class ParquetReader private final List blockRowRanges; private final Map paths = new HashMap<>(); + protected final ParquetDataSource dataSource; + protected BlockMetaData currentBlockMetadata; + private final boolean columnIndexFilterEnabled; - public ParquetReader(MessageColumnIO - messageColumnIO, + public ParquetReader( + MessageColumnIO messageColumnIO, List blocks, Optional> firstRowsOfBlocks, ParquetDataSource dataSource, @@ -139,7 +142,8 @@ public ParquetReader(MessageColumnIO boolean enableVerification, Predicate parquetPredicate, List blockIndexStores, - boolean columnIndexFilterEnabled) + boolean columnIndexFilterEnabled, + Optional fileDecryptor) { this.blocks = blocks; this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null"); @@ -172,6 +176,8 @@ public ParquetReader(MessageColumnIO } this.currentBlock = -1; this.columnIndexFilterEnabled = columnIndexFilterEnabled; + requireNonNull(fileDecryptor, "fileDecryptor is null"); + this.fileDecryptor = fileDecryptor; } @Override @@ -395,7 +401,7 @@ protected PageReader createPageReader(List buffers, int bufferSize, { ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata, bufferSize); ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffers, offsetIndex); - return columnChunk.readAllPages(); + return createPageReaderInternal(columnDescriptor, columnChunk); } protected PageReader createPageReader(byte[] buffer, int bufferSize, ColumnChunkMetaData metadata, ColumnDescriptor columnDescriptor) @@ -403,7 +409,24 @@ protected PageReader createPageReader(byte[] buffer, int bufferSize, ColumnChunk { ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata, bufferSize); ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffer, 0); - return columnChunk.readAllPages(); + return createPageReaderInternal(columnDescriptor, columnChunk); + } + + private PageReader createPageReaderInternal(ColumnDescriptor columnDescriptor, ParquetColumnChunk columnChunk) + throws IOException + { + if (!isEncryptedColumn(fileDecryptor, columnDescriptor)) { + return columnChunk.readAllPages(Optional.empty(), -1, -1); + } + + int columnOrdinal = fileDecryptor.get().getColumnSetup(ColumnPath.get(columnChunk.getDescriptor().getColumnDescriptor().getPath())).getOrdinal(); + return columnChunk.readAllPages(fileDecryptor, currentBlock, columnOrdinal); + } + + private boolean isEncryptedColumn(Optional fileDecryptor, ColumnDescriptor columnDescriptor) + { + ColumnPath columnPath = ColumnPath.get(columnDescriptor.getPath()); + return fileDecryptor.isPresent() && !fileDecryptor.get().plaintextFile() && fileDecryptor.get().getColumnSetup(columnPath).isEncrypted(); } protected byte[] allocateBlock(long length) @@ -418,7 +441,7 @@ private ColumnChunkMetaData getColumnChunkMetaData(ColumnDescriptor columnDescri throws IOException { for (ColumnChunkMetaData metadata : currentBlockMetadata.getColumns()) { - if (metadata.getPath().equals(ColumnPath.get(columnDescriptor.getPath()))) { + if (!HiddenColumnChunkMetaData.isHiddenColumn(metadata) && metadata.getPath().equals(ColumnPath.get(columnDescriptor.getPath()))) { return metadata; } } diff --git a/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnChunkMetaData.java b/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnChunkMetaData.java new file mode 100644 index 0000000000000..1a0a936ffe822 --- /dev/null +++ b/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnChunkMetaData.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.crypto; + +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +import static java.util.Objects.requireNonNull; + +public class HiddenColumnChunkMetaData + extends ColumnChunkMetaData +{ + private final ColumnPath path; + private final String filePath; + + public HiddenColumnChunkMetaData(ColumnPath path, String filePath) + { + super(null, null); + this.path = requireNonNull(path, "path should not be null"); + this.filePath = requireNonNull(filePath, "filePath should not be null"); + } + + @Override + public long getFirstDataPageOffset() + { + throw new HiddenColumnException(path.toArray(), filePath); + } + + @Override + public long getDictionaryPageOffset() + { + throw new HiddenColumnException(path.toArray(), filePath); + } + + @Override + public long getValueCount() + { + throw new HiddenColumnException(path.toArray(), this.filePath); + } + + @Override + public long getTotalUncompressedSize() + { + throw new HiddenColumnException(path.toArray(), filePath); + } + + @Override + public long getTotalSize() + { + throw new HiddenColumnException(path.toArray(), filePath); + } + + @Override + public Statistics getStatistics() + { + throw new HiddenColumnException(path.toArray(), filePath); + } + + public static boolean isHiddenColumn(ColumnChunkMetaData column) + { + return column instanceof HiddenColumnChunkMetaData; + } +} diff --git a/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java b/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java new file mode 100644 index 0000000000000..c2de5b4108d10 --- /dev/null +++ b/presto-parquet/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.crypto; + +import org.apache.parquet.ParquetRuntimeException; + +import java.util.Arrays; + +public class HiddenColumnException + extends ParquetRuntimeException +{ + private static final long serialVersionUID = 1L; + + public HiddenColumnException(String[] columnPath, String filePath) + { + super(String.format("User does not have access to the encryption key for encrypted column = %s for file: %s", Arrays.toString(columnPath), filePath)); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java index 5f5538b6dd835..cb5eb382c1839 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java @@ -273,13 +273,13 @@ ParquetReader createRecordReader() throws IOException { FileParquetDataSource dataSource = new FileParquetDataSource(file); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, file.length()).getParquetMetadata(); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, file.length(), Optional.empty()).getParquetMetadata(); MessageType schema = parquetMetadata.getFileMetaData().getSchema(); MessageColumnIO messageColumnIO = getColumnIO(schema, schema); this.field = ColumnIOConverter.constructField(getType(), messageColumnIO.getChild(0)).get(); - return new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), enableOptimizedReader, enableVerification, null, null, false); + return new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), enableOptimizedReader, enableVerification, null, null, false, Optional.empty()); } protected boolean getNullability() diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptDecryptUtil.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptDecryptUtil.java new file mode 100644 index 0000000000000..4441b6a962728 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptDecryptUtil.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; +import org.apache.parquet.crypto.ColumnEncryptionProperties; +import org.apache.parquet.crypto.DecryptionKeyRetriever; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.ParquetCipher; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EncryptDecryptUtil +{ + private EncryptDecryptUtil() + { + } + + private static class DecryptionKeyRetrieverMock + implements DecryptionKeyRetriever + { + private final Map keyMap = new HashMap<>(); + + public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) + { + keyMap.put(keyId, keyBytes); + return this; + } + + @Override + public byte[] getKey(byte[] keyMetaData) + { + String keyId = new String(keyMetaData, StandardCharsets.UTF_8); + return keyMap.get(keyId); + } + } + + private static final String FOOTER_KEY_METADATA = "footkey"; + private static final String COL_KEY_METADATA = "col"; + private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, + 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}; + private static final byte[] FOOTER_KEY_METADATA_BYTES = FOOTER_KEY_METADATA.getBytes(StandardCharsets.UTF_8); + private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, + 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}; + private static final byte[] COL_KEY_METADATA_BYTES = COL_KEY_METADATA.getBytes(StandardCharsets.UTF_8); + + public static FileDecryptionProperties getFileDecryptionProperties() + { + DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock(); + keyRetriever.putKey("footkey", FOOTER_KEY); + keyRetriever.putKey("col", COL_KEY); + return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build(); + } + + public static FileEncryptionProperties getFileEncryptionProperties(List encryptColumns, ParquetCipher cipher, Boolean encryptFooter) + { + if (encryptColumns.size() == 0) { + return null; + } + + Map columnPropertyMap = new HashMap<>(); + for (String encryptColumn : encryptColumns) { + ColumnPath columnPath = ColumnPath.fromDotString(encryptColumn); + ColumnEncryptionProperties columnEncryptionProperties = ColumnEncryptionProperties.builder(columnPath) + .withKey(COL_KEY) + .withKeyMetaData(COL_KEY_METADATA_BYTES) + .build(); + columnPropertyMap.put(columnPath, columnEncryptionProperties); + } + + FileEncryptionProperties.Builder encryptionPropertiesBuilder = + FileEncryptionProperties.builder(FOOTER_KEY) + .withFooterKeyMetadata(FOOTER_KEY_METADATA_BYTES) + .withAlgorithm(cipher) + .withEncryptedColumns(columnPropertyMap); + + if (!encryptFooter) { + encryptionPropertiesBuilder.withPlaintextFooter(); + } + + return encryptionPropertiesBuilder.build(); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFile.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFile.java new file mode 100644 index 0000000000000..21a39033f9f4d --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFile.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.simple.SimpleGroup; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + +public class EncryptionTestFile +{ + private final String fileName; + private final SimpleGroup[] fileContent; + + public EncryptionTestFile(String fileName, SimpleGroup[] fileContent) + { + checkArgument(!isNullOrEmpty(fileName), "file name cannot be null or empty"); + this.fileName = fileName; + checkArgument(fileContent != null && fileContent.length > 0, "file content cannot be null or empty"); + this.fileContent = fileContent; + } + + public String getFileName() + { + return this.fileName; + } + + public SimpleGroup[] getFileContent() + { + return fileContent; + } + + public long getFileSize() + throws IOException + { + Path path = new Path(fileName); + return path.getFileSystem(new Configuration()).getFileStatus(path).getLen(); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFileBuilder.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFileBuilder.java new file mode 100644 index 0000000000000..acdc637d13ad1 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/EncryptionTestFileBuilder.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.ParquetCipher; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +public class EncryptionTestFileBuilder +{ + private MessageType schema; + private Configuration conf; + private Map extraMeta = new HashMap<>(); + private int numRecord = 100000; + private ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion.PARQUET_1_0; + private int pageSize = ParquetProperties.DEFAULT_PAGE_SIZE; + private String codec = "ZSTD"; + private String[] encryptColumns = {}; + private ParquetCipher cipher = ParquetCipher.AES_GCM_V1; + private Boolean footerEncryption = false; + + public EncryptionTestFileBuilder(Configuration conf, MessageType schema) + { + this.conf = conf; + this.schema = schema; + conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); + } + + public EncryptionTestFileBuilder withNumRecord(int numRecord) + { + this.numRecord = numRecord; + return this; + } + + public EncryptionTestFileBuilder withEncrytionAlgorithm(ParquetCipher cipher) + { + this.cipher = cipher; + return this; + } + + public EncryptionTestFileBuilder withExtraMeta(Map extraMeta) + { + this.extraMeta = extraMeta; + return this; + } + + public EncryptionTestFileBuilder withWriterVersion(ParquetProperties.WriterVersion writerVersion) + { + this.writerVersion = writerVersion; + return this; + } + + public EncryptionTestFileBuilder withPageSize(int pageSize) + { + this.pageSize = pageSize; + return this; + } + + public EncryptionTestFileBuilder withCodec(String codec) + { + this.codec = codec; + return this; + } + + public EncryptionTestFileBuilder withEncryptColumns(String[] encryptColumns) + { + this.encryptColumns = encryptColumns; + return this; + } + + public EncryptionTestFileBuilder withFooterEncryption() + { + this.footerEncryption = true; + return this; + } + + public EncryptionTestFile build() + throws IOException + { + String fileName = createTempFile("test"); + SimpleGroup[] fileContent = createFileContent(schema); + FileEncryptionProperties encryptionProperties = EncryptDecryptUtil.getFileEncryptionProperties(Arrays.asList(encryptColumns), cipher, footerEncryption); + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(fileName)) + .withConf(conf) + .withWriterVersion(writerVersion) + .withExtraMetaData(extraMeta) + .withValidation(true) + .withPageSize(pageSize) + .withEncryption(encryptionProperties) + .withCompressionCodec(CompressionCodecName.valueOf(codec)); + try (ParquetWriter writer = builder.build()) { + for (int i = 0; i < fileContent.length; i++) { + writer.write(fileContent[i]); + } + } + return new EncryptionTestFile(fileName, fileContent); + } + + private SimpleGroup[] createFileContent(MessageType schema) + { + SimpleGroup[] simpleGroups = new SimpleGroup[numRecord]; + for (int i = 0; i < simpleGroups.length; i++) { + SimpleGroup g = new SimpleGroup(schema); + for (Type type : schema.getFields()) { + addValueToSimpleGroup(g, type); + } + simpleGroups[i] = g; + } + return simpleGroups; + } + + private void addValueToSimpleGroup(Group g, Type type) + { + if (type.isPrimitive()) { + PrimitiveType primitiveType = (PrimitiveType) type; + if (primitiveType.getPrimitiveTypeName().equals(INT32)) { + g.add(type.getName(), getInt()); + } + else if (primitiveType.getPrimitiveTypeName().equals(INT64)) { + g.add(type.getName(), getLong()); + } + else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) { + g.add(type.getName(), getString()); + } + // Only support 3 types now, more can be added later + } + else { + GroupType groupType = (GroupType) type; + Group parentGroup = g.addGroup(groupType.getName()); + for (Type field : groupType.getFields()) { + addValueToSimpleGroup(parentGroup, field); + } + } + } + + private static long getInt() + { + return ThreadLocalRandom.current().nextInt(10000); + } + + private static long getLong() + { + return ThreadLocalRandom.current().nextLong(100000); + } + + private static String getString() + { + char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(chars[ThreadLocalRandom.current().nextInt(10)]); + } + return sb.toString(); + } + + public static String createTempFile(String prefix) + { + try { + return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; + } + catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockInputStreamTail.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockInputStreamTail.java new file mode 100644 index 0000000000000..fd33385a1f359 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockInputStreamTail.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public final class MockInputStreamTail +{ + public static final int MAX_SUPPORTED_PADDING_BYTES = 64; + private static final int MAXIMUM_READ_LENGTH = Integer.MAX_VALUE - (MAX_SUPPORTED_PADDING_BYTES + 1); + + private final Slice tailSlice; + private final long fileSize; + + private MockInputStreamTail(long fileSize, Slice tailSlice) + { + this.tailSlice = requireNonNull(tailSlice, "tailSlice is null"); + this.fileSize = fileSize; + checkArgument(fileSize >= 0, "fileSize is negative: %s", fileSize); + checkArgument(tailSlice.length() <= fileSize, "length (%s) is greater than fileSize (%s)", tailSlice.length(), fileSize); + } + + public static MockInputStreamTail readTail(String path, long paddedFileSize, FSDataInputStream inputStream, int length) + throws IOException + { + checkArgument(length >= 0, "length is negative: %s", length); + checkArgument(length <= MAXIMUM_READ_LENGTH, "length (%s) exceeds maximum (%s)", length, MAXIMUM_READ_LENGTH); + long readSize = min(paddedFileSize, (length + MAX_SUPPORTED_PADDING_BYTES)); + long position = paddedFileSize - readSize; + // Actual read will be 1 byte larger to ensure we encounter an EOF where expected + byte[] buffer = new byte[toIntExact(readSize + 1)]; + int bytesRead = 0; + long startPos = inputStream.getPos(); + try { + inputStream.seek(position); + while (bytesRead < buffer.length) { + int n = inputStream.read(buffer, bytesRead, buffer.length - bytesRead); + if (n < 0) { + break; + } + bytesRead += n; + } + } + finally { + inputStream.seek(startPos); + } + if (bytesRead > readSize) { + throw rejectInvalidFileSize(path, paddedFileSize); + } + return new MockInputStreamTail(position + bytesRead, Slices.wrappedBuffer(buffer, max(0, bytesRead - length), min(bytesRead, length))); + } + + public static long readTailForFileSize(String path, long paddedFileSize, FSDataInputStream inputStream) + throws IOException + { + long position = max(paddedFileSize - MAX_SUPPORTED_PADDING_BYTES, 0); + long maxEOFAt = paddedFileSize + 1; + long startPos = inputStream.getPos(); + try { + inputStream.seek(position); + int c; + while (position < maxEOFAt) { + c = inputStream.read(); + if (c < 0) { + return position; + } + position++; + } + throw rejectInvalidFileSize(path, paddedFileSize); + } + finally { + inputStream.seek(startPos); + } + } + + private static IOException rejectInvalidFileSize(String path, long reportedSize) + throws IOException + { + throw new IOException(format("Incorrect file size (%s) for file (end of stream not reached): %s", reportedSize, path)); + } + + public long getFileSize() + { + return fileSize; + } + + public Slice getTailSlice() + { + return tailSlice; + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockParquetDataSource.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockParquetDataSource.java new file mode 100644 index 0000000000000..95377f113cdad --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/MockParquetDataSource.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.NotSupportedException; +import com.facebook.presto.parquet.ParquetDataSource; +import com.facebook.presto.parquet.ParquetDataSourceId; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class MockParquetDataSource + implements ParquetDataSource +{ + private final ParquetDataSourceId id; + private final long estimatedSize; + private final FSDataInputStream inputStream; + private long readTimeNanos; + private long readBytes; + + public MockParquetDataSource( + ParquetDataSourceId id, + long estimatedSize, + FSDataInputStream inputStream) + { + this.id = requireNonNull(id, "id is null"); + this.estimatedSize = estimatedSize; + this.inputStream = inputStream; + } + + @Override + public ParquetDataSourceId getId() + { + return id; + } + + @Override + public final long getReadBytes() + { + return readBytes; + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public void close() + throws IOException + { + inputStream.close(); + } + + @Override + public final void readFully(long position, byte[] buffer) + { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + { + readBytes += bufferLength; + + long start = System.nanoTime(); + try { + inputStream.readFully(position, buffer, bufferOffset, bufferLength); + } + catch (Exception e) { + throw new RuntimeException("Error reading from %s " + id + " at position " + position); + } + long currentReadTimeNanos = System.nanoTime() - start; + + readTimeNanos += currentReadTimeNanos; + } + + @Override + public Optional readColumnIndex(ColumnChunkMetaData column) + throws IOException + { + throw new NotSupportedException("Not supported"); + } + + @Override + public Optional readOffsetIndex(ColumnChunkMetaData column) + throws IOException + { + throw new NotSupportedException("Not supported"); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestEncryption.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestEncryption.java new file mode 100644 index 0000000000000..74eff9b9f07a9 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestEncryption.java @@ -0,0 +1,351 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.MapType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.parquet.Field; +import com.facebook.presto.parquet.GroupField; +import com.facebook.presto.parquet.ParquetDataSource; +import com.facebook.presto.parquet.ParquetDataSourceId; +import com.facebook.presto.parquet.PrimitiveField; +import com.facebook.presto.parquet.RichColumnDescriptor; +import com.facebook.presto.parquet.cache.MetadataReader; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.crypto.ParquetCipher; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIO; +import org.apache.parquet.io.GroupColumnIO; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.PrimitiveColumnIO; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.parquet.ParquetTypeUtils.getArrayElementColumn; +import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; +import static com.facebook.presto.parquet.ParquetTypeUtils.getMapKeyValueColumn; +import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName; +import static org.apache.parquet.io.ColumnIOUtil.columnDefinitionLevel; +import static org.apache.parquet.io.ColumnIOUtil.columnRepetitionLevel; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.testng.Assert.assertEquals; + +public class TestEncryption +{ + private final Configuration conf = new Configuration(false); + + @Test + public void testBasicDecryption() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + Map extraMetadata = new HashMap() {{ + put("key1", "value1"); + put("key2", "value2"); + }}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(10000) + .withCodec("GZIP") + .withExtraMeta(extraMetadata) + .withPageSize(1000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testAllColumnsDecryption() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"id", "name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(10000) + .withCodec("GZIP") + .withPageSize(1000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testNoColumnsDecryption() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(10000) + .withCodec("GZIP") + .withPageSize(1000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testOneRecord() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(1) + .withCodec("GZIP") + .withPageSize(1000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testMillionRows() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(1000000) + .withCodec("GZIP") + .withPageSize(1000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testPlainTextFooter() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(10000) + .withCodec("SNAPPY") + .withPageSize(1000) + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testLargePageSize() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(100000) + .withCodec("GZIP") + .withPageSize(100000) + .withFooterEncryption() + .build(); + decryptAndValidate(inputFile); + } + + @Test + public void testAesGcmCtr() + throws IOException + { + MessageType schema = createSchema(); + String[] encryptColumns = {"name", "gender"}; + EncryptionTestFile inputFile = new EncryptionTestFileBuilder(conf, schema) + .withEncryptColumns(encryptColumns) + .withNumRecord(100000) + .withCodec("GZIP") + .withPageSize(1000) + .withEncrytionAlgorithm(ParquetCipher.AES_GCM_CTR_V1) + .build(); + decryptAndValidate(inputFile); + } + + private MessageType createSchema() + { + return new MessageType("schema", + new PrimitiveType(OPTIONAL, INT64, "id"), + new PrimitiveType(REQUIRED, BINARY, "name"), + new PrimitiveType(OPTIONAL, BINARY, "gender")); + } + + private void decryptAndValidate(EncryptionTestFile inputFile) + throws IOException + { + Path path = new Path(inputFile.getFileName()); + FileSystem fileSystem = path.getFileSystem(conf); + FSDataInputStream inputStream = fileSystem.open(path); + long fileSize = fileSystem.getFileStatus(path).getLen(); + Optional fileDecryptor = createFileDecryptor(); + ParquetDataSource dataSource = new MockParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, inputFile.getFileSize(), fileDecryptor).getParquetMetadata(); + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + MessageType fileSchema = fileMetaData.getSchema(); + MessageColumnIO messageColumn = getColumnIO(fileSchema, fileSchema); + ParquetReader parquetReader = createParquetReader(parquetMetadata, messageColumn, dataSource, fileDecryptor); + validateFile(parquetReader, messageColumn, inputFile); + } + + private Optional createFileDecryptor() + { + FileDecryptionProperties fileDecryptionProperties = EncryptDecryptUtil.getFileDecryptionProperties(); + if (fileDecryptionProperties != null) { + return Optional.of(new InternalFileDecryptor(fileDecryptionProperties)); + } + return Optional.empty(); + } + + private ParquetReader createParquetReader(ParquetMetadata parquetMetadata, + MessageColumnIO messageColumn, + ParquetDataSource dataSource, + Optional fileDecryptor) + { + ImmutableList.Builder blocks = ImmutableList.builder(); + ImmutableList.Builder blockStarts = ImmutableList.builder(); + + long nextStart = 0; + for (BlockMetaData block : parquetMetadata.getBlocks()) { + blocks.add(block); + blockStarts.add(nextStart); + nextStart += block.getRowCount(); + } + + return new ParquetReader( + messageColumn, + blocks.build(), + Optional.empty(), + dataSource, + com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), + new DataSize(100000, DataSize.Unit.BYTE), + false, + false, + null, + null, + false, + fileDecryptor); + } + + private void validateFile(ParquetReader parquetReader, MessageColumnIO messageColumn, EncryptionTestFile inputFile) + throws IOException + { + int rowIndex = 0; + int batchSize = parquetReader.nextBatch(); + while (batchSize > 0) { + validateColumn("id", BIGINT, rowIndex, parquetReader, messageColumn, inputFile); + validateColumn("name", VARCHAR, rowIndex, parquetReader, messageColumn, inputFile); + validateColumn("gender", VARCHAR, rowIndex, parquetReader, messageColumn, inputFile); + rowIndex += batchSize; + batchSize = parquetReader.nextBatch(); + } + } + + private void validateColumn(String name, Type type, int rowIndex, ParquetReader parquetReader, MessageColumnIO messageColumn, EncryptionTestFile inputFile) + throws IOException + { + Block block = parquetReader.readBlock(constructField(type, lookupColumnByName(messageColumn, name)).orElse(null)); + for (int i = 0; i < block.getPositionCount(); i++) { + if (type.equals(BIGINT)) { + assertEquals(inputFile.getFileContent()[rowIndex++].getLong(name, 0), block.getLong(i)); + } + else if (type.equals(INT32)) { + assertEquals(inputFile.getFileContent()[rowIndex++].getInteger(name, 0), block.getInt(i)); + } + else if (type.equals(VARCHAR)) { + assertEquals(inputFile.getFileContent()[rowIndex++].getString(name, 0), block.getSlice(i, 0, block.getSliceLength(i)).toStringUtf8()); + } + } + } + + private Optional constructField(Type type, ColumnIO columnIO) + { + if (columnIO == null) { + return Optional.empty(); + } + boolean required = columnIO.getType().getRepetition() != OPTIONAL; + int repetitionLevel = columnRepetitionLevel(columnIO); + int definitionLevel = columnDefinitionLevel(columnIO); + if (type instanceof RowType) { + RowType rowType = (RowType) type; + GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; + ImmutableList.Builder> fieldsBuilder = ImmutableList.builder(); + List fields = rowType.getFields(); + boolean structHasParameters = false; + for (int i = 0; i < fields.size(); i++) { + RowType.Field rowField = fields.get(i); + String name = rowField.getName().get().toLowerCase(Locale.ENGLISH); + Optional field = constructField(rowField.getType(), lookupColumnByName(groupColumnIO, name)); + structHasParameters |= field.isPresent(); + fieldsBuilder.add(field); + } + if (structHasParameters) { + return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, fieldsBuilder.build())); + } + return Optional.empty(); + } + if (type instanceof MapType) { + MapType mapType = (MapType) type; + GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; + GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO); + if (keyValueColumnIO.getChildrenCount() != 2) { + return Optional.empty(); + } + Optional keyField = constructField(mapType.getKeyType(), keyValueColumnIO.getChild(0)); + Optional valueField = constructField(mapType.getValueType(), keyValueColumnIO.getChild(1)); + return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(keyField, valueField))); + } + if (type instanceof ArrayType) { + ArrayType arrayType = (ArrayType) type; + GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; + if (groupColumnIO.getChildrenCount() != 1) { + return Optional.empty(); + } + Optional field = constructField(arrayType.getElementType(), getArrayElementColumn(groupColumnIO.getChild(0))); + return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, ImmutableList.of(field))); + } + PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO; + RichColumnDescriptor column = new RichColumnDescriptor(primitiveColumnIO.getColumnDescriptor(), columnIO.getType().asPrimitiveType()); + return Optional.of(new PrimitiveField(type, repetitionLevel, definitionLevel, required, column, primitiveColumnIO.getId())); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestHiddenColumnChunkMetaData.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestHiddenColumnChunkMetaData.java new file mode 100644 index 0000000000000..51ec7149442f0 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestHiddenColumnChunkMetaData.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import org.apache.parquet.column.Encoding; +import org.apache.parquet.crypto.HiddenColumnChunkMetaData; +import org.apache.parquet.crypto.HiddenColumnException; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.Set; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestHiddenColumnChunkMetaData +{ + @Test + public void testIsHiddenColumn() + { + ColumnChunkMetaData column = new HiddenColumnChunkMetaData(ColumnPath.fromDotString("a.b.c"), + "hdfs:/foo/bar/a.parquet"); + assertTrue(HiddenColumnChunkMetaData.isHiddenColumn(column)); + } + + @Test + public void testIsNotHiddenColumn() + { + Set encodingSet = Collections.singleton(Encoding.RLE); + ColumnChunkMetaData column = ColumnChunkMetaData.get(ColumnPath.fromDotString("a.b.c"), BINARY, + CompressionCodecName.GZIP, encodingSet, -1, -1, -1, -1, -1); + assertFalse(HiddenColumnChunkMetaData.isHiddenColumn(column)); + } + + @Test(expectedExceptions = HiddenColumnException.class) + public void testHiddenColumnException() + { + ColumnChunkMetaData column = new HiddenColumnChunkMetaData(ColumnPath.fromDotString("a.b.c"), + "hdfs:/foo/bar/a.parquet"); + column.getStatistics(); + } + + @Test + public void testNoHiddenColumnException() + { + Set encodingSet = Collections.singleton(Encoding.RLE); + ColumnChunkMetaData column = ColumnChunkMetaData.get(ColumnPath.fromDotString("a.b.c"), BINARY, + CompressionCodecName.GZIP, encodingSet, -1, -1, -1, -1, -1); + column.getStatistics(); + } +}