Skip to content

Commit 72c944a

Browse files
wangyumsunchaoturboFei
authored andcommitted
[CARMEL-5873] Upgrade Parquet to 1.12.2 (#901)
* [CARMEL-5873] Upgrade Parquet to 1.12.2 (#896) * [SPARK-36726] Upgrade Parquet to 1.12.1 ### What changes were proposed in this pull request? Upgrade Apache Parquet to 1.12.1 ### Why are the changes needed? Parquet 1.12.1 contains the following bug fixes: - PARQUET-2064: Make Range public accessible in RowRanges - PARQUET-2022: ZstdDecompressorStream should close `zstdInputStream` - PARQUET-2052: Integer overflow when writing huge binary using dictionary encoding - PARQUET-1633: Fix integer overflow - PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFile - PARQUET-2072: Do Not Determine Both Min/Max for Binary Stats - PARQUET-2073: Fix estimate remaining row count in ColumnWriteStoreBase - PARQUET-2078: Failed to read parquet file after writing with the same In particular PARQUET-2078 is a blocker for the upcoming Apache Spark 3.2.0 release. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests + a new test for the issue in SPARK-36696 Closes #33969 from sunchao/upgrade-parquet-12.1. Authored-by: Chao Sun <[email protected]> Signed-off-by: DB Tsai <[email protected]> (cherry picked from commit a927b08) * [SPARK-34542][BUILD] Upgrade Parquet to 1.12.0 ### What changes were proposed in this pull request? Parquet 1.12.0 New Feature - PARQUET-41 - Add bloom filters to parquet statistics - PARQUET-1373 - Encryption key management tools - PARQUET-1396 - Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory - PARQUET-1622 - Add BYTE_STREAM_SPLIT encoding - PARQUET-1784 - Column-wise configuration - PARQUET-1817 - Crypto Properties Factory - PARQUET-1854 - Properties-Driven Interface to Parquet Encryption Parquet 1.12.0 release notes: https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/CHANGES.md ### Why are the changes needed? - Bloom filters to improve filter performance - ZSTD enhancement ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit test. Closes #31649 from wangyum/SPARK-34542. Lead-authored-by: Yuming Wang <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit cbffc12) Co-authored-by: Chao Sun <[email protected]> * [HADP-44647] Parquet file based kms client for encryption keys (#897) * [HADP-44647] Parquet file based kms client for encryption keys (#82) create/write parquet encryption table. ``` set spark.sql.parquet.encryption.key.file=/path/to/key/file; create table parquet_encryption(a int, b int, c int) using parquet options ( 'parquet.encryption.column.keys' 'columnKey1: a, b; columnKey2: c', 'parquet.encryption.footer.key' 'footerKey'); ``` read parquet encryption table; ``` set spark.sql.parquet.encryption.key.file=/path/to/key/file; select ... from parquet_encryption ... ``` Will raise another pr for default footerKey. * [HADP-44647][FOLLOWUP] Reuse the kms instance for same key file (#84) * Fix Co-authored-by: fwang12 <[email protected]> Co-authored-by: Chao Sun <[email protected]> Co-authored-by: fwang12 <[email protected]>
1 parent 5035f66 commit 72c944a

File tree

15 files changed

+372
-30
lines changed

15 files changed

+372
-30
lines changed

dev/deps/spark-deps-hadoop-2.7-hive-1.2

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ curator-recipes/2.7.1//curator-recipes-2.7.1.jar
6161
datanucleus-api-jdo/3.2.6//datanucleus-api-jdo-3.2.6.jar
6262
datanucleus-core/3.2.10//datanucleus-core-3.2.10.jar
6363
datanucleus-rdbms/3.2.9//datanucleus-rdbms-3.2.9.jar
64-
delta-core_2.12/0.8.0.carmel0.1-SNAPSHOT//delta-core_2.12-0.8.0.carmel0.1-SNAPSHOT.jar
64+
delta-core_2.12/0.9.0.carmel0.2-SNAPSHOT//delta-core_2.12-0.9.0.carmel0.2-SNAPSHOT.jar
6565
derby/10.12.1.1//derby-10.12.1.1.jar
6666
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
6767
generex/1.0.2//generex-1.0.2.jar
@@ -148,7 +148,9 @@ jta/1.1//jta-1.1.jar
148148
jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar
149149
kafka-clients/1.1.1//kafka-clients-1.1.1.jar
150150
kafka-schema-registry-client/3.0.0//kafka-schema-registry-client-3.0.0.jar
151+
kryo-serializers/0.43//kryo-serializers-0.43.jar
151152
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
153+
kryo/4.0.2//kryo-4.0.2.jar
152154
kubernetes-client/4.9.2//kubernetes-client-4.9.2.jar
153155
kubernetes-model-common/4.9.2//kubernetes-model-common-4.9.2.jar
154156
kubernetes-model/4.9.2//kubernetes-model-4.9.2.jar
@@ -167,7 +169,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar
167169
metrics-json/4.1.1//metrics-json-4.1.1.jar
168170
metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar
169171
minlog/1.3.0//minlog-1.3.0.jar
170-
mysql-connector-java/5.1.38//mysql-connector-java-5.1.38.jar
172+
mysql-connector-java/8.0.27//mysql-connector-java-8.0.27.jar
171173
netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar
172174
objenesis/2.5.1//objenesis-2.5.1.jar
173175
okhttp/3.12.6//okhttp-3.12.6.jar
@@ -180,16 +182,17 @@ org.abego.treelayout.core/1.0.3//org.abego.treelayout.core-1.0.3.jar
180182
oro/2.0.8//oro-2.0.8.jar
181183
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
182184
paranamer/2.8//paranamer-2.8.jar
183-
parquet-column/1.11.1-ebay-carmel1//parquet-column-1.11.1-ebay-carmel1.jar
184-
parquet-common/1.11.1-ebay-carmel1//parquet-common-1.11.1-ebay-carmel1.jar
185-
parquet-encoding/1.11.1-ebay-carmel1//parquet-encoding-1.11.1-ebay-carmel1.jar
186-
parquet-format-structures/1.11.1-ebay-carmel1//parquet-format-structures-1.11.1-ebay-carmel1.jar
185+
parquet-column/1.12.2.0.1.0//parquet-column-1.12.2.0.1.0.jar
186+
parquet-common/1.12.2.0.1.0//parquet-common-1.12.2.0.1.0.jar
187+
parquet-encoding/1.12.2.0.1.0//parquet-encoding-1.12.2.0.1.0.jar
188+
parquet-format-structures/1.12.2.0.1.0//parquet-format-structures-1.12.2.0.1.0.jar
187189
parquet-hadoop-bundle/1.6.0//parquet-hadoop-bundle-1.6.0.jar
188-
parquet-hadoop/1.11.1-ebay-carmel1//parquet-hadoop-1.11.1-ebay-carmel1.jar
189-
parquet-jackson/1.11.1-ebay-carmel1//parquet-jackson-1.11.1-ebay-carmel1.jar
190+
parquet-hadoop/1.12.2.0.1.0//parquet-hadoop-1.12.2.0.1.0.jar
191+
parquet-jackson/1.12.2.0.1.0//parquet-jackson-1.12.2.0.1.0.jar
190192
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
191193
py4j/0.10.9//py4j-0.10.9.jar
192194
pyrolite/4.30//pyrolite-4.30.jar
195+
reflectasm/1.11.3//reflectasm-1.11.3.jar
193196
rheos-client_JDK-1.8/2.0.2//rheos-client_JDK-1.8-2.0.2.jar
194197
rheos-common_JDK-1.8/2.0.2//rheos-common_JDK-1.8-2.0.2.jar
195198
rheos-kafka-security_JDK-1.8/2.0.2//rheos-kafka-security_JDK-1.8-2.0.2.jar

dev/deps/spark-deps-hadoop-2.7-hive-2.3

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ curator-recipes/2.7.1//curator-recipes-2.7.1.jar
6060
datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar
6161
datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
6262
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
63-
delta-core_2.12/0.8.0.carmel0.1-SNAPSHOT//delta-core_2.12-0.8.0.carmel0.1-SNAPSHOT.jar
63+
delta-core_2.12/0.9.0.carmel0.2-SNAPSHOT//delta-core_2.12-0.9.0.carmel0.2-SNAPSHOT.jar
6464
derby/10.12.1.1//derby-10.12.1.1.jar
6565
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
6666
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
@@ -164,7 +164,9 @@ jta/1.1//jta-1.1.jar
164164
jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar
165165
kafka-clients/1.1.1//kafka-clients-1.1.1.jar
166166
kafka-schema-registry-client/3.0.0//kafka-schema-registry-client-3.0.0.jar
167+
kryo-serializers/0.43//kryo-serializers-0.43.jar
167168
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
169+
kryo/4.0.2//kryo-4.0.2.jar
168170
kubernetes-client/4.9.2//kubernetes-client-4.9.2.jar
169171
kubernetes-model-common/4.9.2//kubernetes-model-common-4.9.2.jar
170172
kubernetes-model/4.9.2//kubernetes-model-4.9.2.jar
@@ -183,7 +185,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar
183185
metrics-json/4.1.1//metrics-json-4.1.1.jar
184186
metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar
185187
minlog/1.3.0//minlog-1.3.0.jar
186-
mysql-connector-java/5.1.38//mysql-connector-java-5.1.38.jar
188+
mysql-connector-java/8.0.27//mysql-connector-java-8.0.27.jar
187189
netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar
188190
objenesis/2.5.1//objenesis-2.5.1.jar
189191
okhttp/3.12.6//okhttp-3.12.6.jar
@@ -196,15 +198,16 @@ org.abego.treelayout.core/1.0.3//org.abego.treelayout.core-1.0.3.jar
196198
oro/2.0.8//oro-2.0.8.jar
197199
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
198200
paranamer/2.8//paranamer-2.8.jar
199-
parquet-column/1.11.1-ebay-carmel1//parquet-column-1.11.1-ebay-carmel1.jar
200-
parquet-common/1.11.1-ebay-carmel1//parquet-common-1.11.1-ebay-carmel1.jar
201-
parquet-encoding/1.11.1-ebay-carmel1//parquet-encoding-1.11.1-ebay-carmel1.jar
202-
parquet-format-structures/1.11.1-ebay-carmel1//parquet-format-structures-1.11.1-ebay-carmel1.jar
203-
parquet-hadoop/1.11.1-ebay-carmel1//parquet-hadoop-1.11.1-ebay-carmel1.jar
204-
parquet-jackson/1.11.1-ebay-carmel1//parquet-jackson-1.11.1-ebay-carmel1.jar
201+
parquet-column/1.12.2.0.1.0//parquet-column-1.12.2.0.1.0.jar
202+
parquet-common/1.12.2.0.1.0//parquet-common-1.12.2.0.1.0.jar
203+
parquet-encoding/1.12.2.0.1.0//parquet-encoding-1.12.2.0.1.0.jar
204+
parquet-format-structures/1.12.2.0.1.0//parquet-format-structures-1.12.2.0.1.0.jar
205+
parquet-hadoop/1.12.2.0.1.0//parquet-hadoop-1.12.2.0.1.0.jar
206+
parquet-jackson/1.12.2.0.1.0//parquet-jackson-1.12.2.0.1.0.jar
205207
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
206208
py4j/0.10.9//py4j-0.10.9.jar
207209
pyrolite/4.30//pyrolite-4.30.jar
210+
reflectasm/1.11.3//reflectasm-1.11.3.jar
208211
rheos-client_JDK-1.8/2.0.2//rheos-client_JDK-1.8-2.0.2.jar
209212
rheos-common_JDK-1.8/2.0.2//rheos-common_JDK-1.8-2.0.2.jar
210213
rheos-kafka-security_JDK-1.8/2.0.2//rheos-kafka-security_JDK-1.8-2.0.2.jar

dev/deps/spark-deps-hadoop-3.2-hive-2.3

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ curator-recipes/2.13.0//curator-recipes-2.13.0.jar
5656
datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar
5757
datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
5858
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
59-
delta-core_2.12/0.8.0.carmel0.1-SNAPSHOT//delta-core_2.12-0.8.0.carmel0.1-SNAPSHOT.jar
59+
delta-core_2.12/0.9.0.carmel0.2-SNAPSHOT//delta-core_2.12-0.9.0.carmel0.2-SNAPSHOT.jar
6060
derby/10.12.1.1//derby-10.12.1.1.jar
6161
dnsjava/2.1.7//dnsjava-2.1.7.jar
6262
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
@@ -174,7 +174,9 @@ kerby-config/1.0.1//kerby-config-1.0.1.jar
174174
kerby-pkix/1.0.1//kerby-pkix-1.0.1.jar
175175
kerby-util/1.0.1//kerby-util-1.0.1.jar
176176
kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
177+
kryo-serializers/0.43//kryo-serializers-0.43.jar
177178
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
179+
kryo/4.0.2//kryo-4.0.2.jar
178180
kubernetes-client/4.9.2//kubernetes-client-4.9.2.jar
179181
kubernetes-model-common/4.9.2//kubernetes-model-common-4.9.2.jar
180182
kubernetes-model/4.9.2//kubernetes-model-4.9.2.jar
@@ -193,7 +195,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar
193195
metrics-json/4.1.1//metrics-json-4.1.1.jar
194196
metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar
195197
minlog/1.3.0//minlog-1.3.0.jar
196-
mysql-connector-java/5.1.38//mysql-connector-java-5.1.38.jar
198+
mysql-connector-java/8.0.27//mysql-connector-java-8.0.27.jar
197199
netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar
198200
nimbus-jose-jwt/4.41.1//nimbus-jose-jwt-4.41.1.jar
199201
objenesis/2.5.1//objenesis-2.5.1.jar
@@ -208,16 +210,17 @@ org.abego.treelayout.core/1.0.3//org.abego.treelayout.core-1.0.3.jar
208210
oro/2.0.8//oro-2.0.8.jar
209211
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
210212
paranamer/2.8//paranamer-2.8.jar
211-
parquet-column/1.11.1-ebay-carmel1//parquet-column-1.11.1-ebay-carmel1.jar
212-
parquet-common/1.11.1-ebay-carmel1//parquet-common-1.11.1-ebay-carmel1.jar
213-
parquet-encoding/1.11.1-ebay-carmel1//parquet-encoding-1.11.1-ebay-carmel1.jar
214-
parquet-format-structures/1.11.1-ebay-carmel1//parquet-format-structures-1.11.1-ebay-carmel1.jar
215-
parquet-hadoop/1.11.1-ebay-carmel1//parquet-hadoop-1.11.1-ebay-carmel1.jar
216-
parquet-jackson/1.11.1-ebay-carmel1//parquet-jackson-1.11.1-ebay-carmel1.jar
213+
parquet-column/1.12.2.0.1.0//parquet-column-1.12.2.0.1.0.jar
214+
parquet-common/1.12.2.0.1.0//parquet-common-1.12.2.0.1.0.jar
215+
parquet-encoding/1.12.2.0.1.0//parquet-encoding-1.12.2.0.1.0.jar
216+
parquet-format-structures/1.12.2.0.1.0//parquet-format-structures-1.12.2.0.1.0.jar
217+
parquet-hadoop/1.12.2.0.1.0//parquet-hadoop-1.12.2.0.1.0.jar
218+
parquet-jackson/1.12.2.0.1.0//parquet-jackson-1.12.2.0.1.0.jar
217219
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
218220
py4j/0.10.9//py4j-0.10.9.jar
219221
pyrolite/4.30//pyrolite-4.30.jar
220222
re2j/1.1//re2j-1.1.jar
223+
reflectasm/1.11.3//reflectasm-1.11.3.jar
221224
rheos-client_JDK-1.8/2.0.2//rheos-client_JDK-1.8-2.0.2.jar
222225
rheos-common_JDK-1.8/2.0.2//rheos-common_JDK-1.8-2.0.2.jar
223226
rheos-kafka-security_JDK-1.8/2.0.2//rheos-kafka-security_JDK-1.8-2.0.2.jar

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@
149149
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
150150
<kafka.version>2.4.1</kafka.version>
151151
<derby.version>10.12.1.1</derby.version>
152-
<parquet.version>1.11.1-ebay-carmel1</parquet.version>
152+
<parquet.version>1.12.2.0.1.0</parquet.version>
153153
<orc.version>1.5.10</orc.version>
154154
<orc.classifier></orc.classifier>
155155
<hive.parquet.group>com.twitter</hive.parquet.group>
@@ -1987,7 +1987,7 @@
19871987
<groupId>${hive.group}</groupId>
19881988
<artifactId>hive-service-rpc</artifactId>
19891989
</exclusion>
1990-
<!-- parquet-hadoop-bundle:1.8.1 conflict with 1.10.1 -->
1990+
<!-- parquet-hadoop-bundle:1.8.1 conflict with 1.12.0 -->
19911991
<exclusion>
19921992
<groupId>org.apache.parquet</groupId>
19931993
<artifactId>parquet-hadoop-bundle</artifactId>

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,12 @@ object SQLConf {
891891
.stringConf
892892
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
893893

894+
val PARQUET_ENCRYPTION_KEY_FILE = buildConf("spark.sql.parquet.encryption.key.file")
895+
.doc("The key file for parquet encryption.")
896+
.internal()
897+
.stringConf
898+
.createOptional
899+
894900
val PARQUET_VECTORIZED_READER_ENABLED =
895901
buildConf("spark.sql.parquet.enableVectorizedReader")
896902
.doc("Enables vectorized parquet decoding.")

sql/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@
168168
<artifactId>selenium-htmlunit-driver</artifactId>
169169
<scope>test</scope>
170170
</dependency>
171+
<dependency>
172+
<groupId>org.codehaus.jackson</groupId>
173+
<artifactId>jackson-mapper-asl</artifactId>
174+
<scope>test</scope>
175+
</dependency>
171176
</dependencies>
172177
<build>
173178
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import java.io.File
21+
import java.nio.charset.StandardCharsets
22+
import java.util.{Base64, UUID}
23+
import java.util.concurrent.ConcurrentHashMap
24+
25+
import scala.io.Source
26+
import scala.util.control.NonFatal
27+
28+
import org.apache.hadoop.conf.Configuration
29+
import org.apache.hadoop.fs.Path
30+
import org.apache.parquet.crypto.{EncryptionPropertiesFactory, ParquetCryptoRuntimeException}
31+
import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient, PropertiesDrivenCryptoFactory}
32+
33+
import org.apache.spark.SparkFiles
34+
import org.apache.spark.internal.Logging
35+
import org.apache.spark.sql.SparkSession
36+
37+
class ParquetEncryptionFileBasedKMS extends KmsClient {
38+
import ParquetEncryptionFileBasedKMS.KEY_FILE_PROPERTY_NAME
39+
40+
var masterKeyMap: Map[String, Array[Byte]] = _
41+
42+
override def initialize(
43+
configuration: Configuration,
44+
kmsInstanceID: String,
45+
kmsInstanceURL: String,
46+
accessToken: String): Unit = {
47+
val keyFile = configuration.get(KEY_FILE_PROPERTY_NAME)
48+
if (null == keyFile || keyFile.isEmpty) {
49+
throw new ParquetCryptoRuntimeException("No encryption key file")
50+
}
51+
52+
val localizedKeyFile = {
53+
try {
54+
val keyFileName = new Path(keyFile).getName
55+
new File(SparkFiles.get(keyFileName))
56+
} catch {
57+
case e: Throwable =>
58+
throw new ParquetCryptoRuntimeException(
59+
s"Failed to get the localized encryption key file $keyFile", e)
60+
}
61+
}
62+
masterKeyMap = parseKeyList(localizedKeyFile)
63+
}
64+
65+
def parseKeyList(keyFile: File): Map[String, Array[Byte]] = {
66+
val lines = Source.fromFile(keyFile).getLines().filter(_.trim.nonEmpty)
67+
lines.map { line =>
68+
val parts = line.split(":")
69+
val keyName = parts.head.trim
70+
if (parts.length != 2) {
71+
throw new IllegalArgumentException(s"Key '$keyName' is not formatted correctly")
72+
}
73+
val key = parts.last.trim
74+
try {
75+
val keyBytes = Base64.getDecoder.decode(key)
76+
keyName -> keyBytes
77+
} catch {
78+
case e: Throwable =>
79+
throw new IllegalArgumentException(s"Could not decode key '$keyName'!", e)
80+
}
81+
}.toMap
82+
}
83+
84+
override def wrapKey(keyBytes: Array[Byte], masterKeyIdentifier: String): String = {
85+
val masterKey = getMasterKey(masterKeyIdentifier)
86+
val AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8)
87+
KeyToolkit.encryptKeyLocally(keyBytes, masterKey, AAD)
88+
}
89+
90+
override def unwrapKey(wrappedKey: String, masterKeyIdentifier: String): Array[Byte] = {
91+
val masterKey = getMasterKey(masterKeyIdentifier)
92+
val AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8)
93+
KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD)
94+
}
95+
96+
private def getMasterKey(masterKeyIdentifier: String): Array[Byte] = {
97+
masterKeyMap.get(masterKeyIdentifier).getOrElse(
98+
throw new ParquetCryptoRuntimeException(s"Key not found:$masterKeyIdentifier"))
99+
}
100+
}
101+
102+
object ParquetEncryptionFileBasedKMS extends Logging {
103+
val KEY_FILE_PROPERTY_NAME = "parquet.encryption.key.file"
104+
105+
// assume that, if the key file path is the same, its content is not change
106+
private[parquet] val keyFileKmsInstanceIdMap = new ConcurrentHashMap[String, String]()
107+
108+
def getEncryptionConf(sparkSession: SparkSession, keyFile: String): Map[String, String] = {
109+
try {
110+
val kmsInstanceId = keyFileKmsInstanceIdMap.computeIfAbsent(
111+
keyFile,
112+
keyFile => {
113+
sparkSession.sparkContext.addFile(keyFile)
114+
UUID.randomUUID().toString
115+
}
116+
)
117+
Map(KEY_FILE_PROPERTY_NAME -> keyFile,
118+
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
119+
classOf[ParquetEncryptionFileBasedKMS].getName,
120+
EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
121+
classOf[PropertiesDrivenCryptoFactory].getName,
122+
KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME -> kmsInstanceId)
123+
} catch {
124+
case NonFatal(e) =>
125+
logError("Failed when getting parquet encryption conf", e)
126+
Map.empty[String, String]
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)