diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml index ca09d8359f96b..26504b134cba4 100644 --- a/hudi-spark-datasource/hudi-spark3/pom.xml +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -161,6 +161,12 @@ true + + org.apache.parquet + parquet-hadoop + 1.12.0 + + com.fasterxml.jackson.core jackson-databind diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/crypto/kms/InMemoryKMS.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/crypto/kms/InMemoryKMS.java new file mode 100644 index 0000000000000..fd83e3151c4bb --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/crypto/kms/InMemoryKMS.java @@ -0,0 +1,103 @@ +package org.apache.hudi.spark3.crypto.kms; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.parquet.crypto.KeyAccessDeniedException; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.crypto.keytools.KeyToolkit; +import org.apache.parquet.crypto.keytools.KmsClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +/** + * This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation. + * (org.apache.parquet.crypto.keytools.samples.VaultClient is the sample implementation). + */ +public class InMemoryKMS implements KmsClient { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryKMS.class); + + private static final ConfigProperty KEY_LIST_PROPERTY_NAME = ConfigProperty + .key("hoodie.parquet.encryption.key.list") + .noDefaultValue() + .withDocumentation("hudi parquet encryption key list, example: keyName_1:value_1, keyName_2:value_2"); + + private static Map masterKeyMap; + + @Override + public synchronized void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) { + // Parse master keys + String[] masterKeys = configuration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME.key()); + if (null == masterKeys || masterKeys.length == 0) { + throw new ParquetCryptoRuntimeException("No encryption key list"); + } + masterKeyMap = parseKeyList(masterKeys); + } + + private static Map parseKeyList(String[] masterKeys) { + + Map keyMap = new HashMap<>(); + + int nKeys = masterKeys.length; + for (int i = 0; i < nKeys; i++) { + String[] parts = masterKeys[i].split(":"); + String keyName = parts[0].trim(); + if (parts.length != 2) { + throw new IllegalArgumentException("Key '" + keyName + "' is not formatted correctly"); + } + String key = parts[1].trim(); + try { + byte[] keyBytes = Base64.getDecoder().decode(key); + keyMap.put(keyName, keyBytes); + } catch (IllegalArgumentException e) { + LOG.warn("Could not decode key '" + keyName + "'!"); + throw e; + } + } + return keyMap; + } + + @Override + public synchronized String wrapKey(byte[] keyBytes, String masterKeyIdentifier) + throws KeyAccessDeniedException, UnsupportedOperationException { + + byte[] masterKey = masterKeyMap.get(masterKeyIdentifier); + if (null == masterKey) { + throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier); + } + byte[] add = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8); + return KeyToolkit.encryptKeyLocally(keyBytes, masterKey, add); + } + + @Override + public synchronized byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier) + throws KeyAccessDeniedException, UnsupportedOperationException { + byte[] masterKey = masterKeyMap.get(masterKeyIdentifier); + if (null == masterKey) { + throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier); + } + byte[] add = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8); + return KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, add); + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/crypto/TestParquetEncryption.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/crypto/TestParquetEncryption.java new file mode 100644 index 0000000000000..01c0b4138a0db --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/crypto/TestParquetEncryption.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.crypto; + +import org.apache.hudi.spark3.crypto.kms.InMemoryKMS; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; + +public class TestParquetEncryption extends HoodieClientTestBase { + + private SparkSession spark; + + private HashMap commonOpts = new HashMap(); + + @BeforeEach + public void setUp() throws Exception { + commonOpts.put("hoodie.insert.shuffle.parallelism", "4"); + commonOpts.put("hoodie.upsert.shuffle.parallelism", "4"); + commonOpts.put("hoodie.bulkinsert.shuffle.parallelism", "4"); + commonOpts.put("hoodie.delete.shuffle.parallelism", "4"); + commonOpts.put("hoodie.datasource.write.recordkey.field", "_row_key"); + commonOpts.put("hoodie.datasource.write.partitionpath.field", "partition"); + commonOpts.put("hoodie.datasource.write.precombine.field", "timestamp"); + commonOpts.put("hoodie.table.name", "hoodie_test"); + + initPath(); + initSparkContexts(); + spark = sqlContext.sparkSession(); + initTestDataGenerator(); + initFileSystem(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupTestDataGenerator(); + cleanupFileSystem(); + } + + @Test + public void testEncryption() { + + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + jsc.hadoopConfiguration().set("parquet.crypto.factory.class", PropertiesDrivenCryptoFactory.class.getName()); + jsc.hadoopConfiguration().set("parquet.encryption.kms.client.class", InMemoryKMS.class.getName()); + jsc.hadoopConfiguration().set("parquet.encryption.footer.key", "k1"); + jsc.hadoopConfiguration().set("parquet.encryption.column.keys", "k2:rider,_row_key"); + jsc.hadoopConfiguration().set("hoodie.parquet.encryption.key.list", "k1:AAECAwQFBgcICQoLDA0ODw==, k2:AAECAAECAAECAAECAAECAA=="); + + List records1 = recordsToStrings(dataGen.generateInserts("000", 100)); + Dataset inputDF1 = spark.read().json(jsc.parallelize(records1, 2)); + + inputDF1.write().format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath); + + + //1. no footer key, no column key + jsc.hadoopConfiguration().clear(); + Assertions.assertThrows(Exception.class, () -> spark.read().format("org.apache.hudi").load(basePath).count()); + Assertions.assertThrows(Exception.class, () -> spark.read().format("org.apache.hudi").load(basePath).select("rider").show(1)); + + //2 has footer key, has column key + jsc.hadoopConfiguration().set("parquet.crypto.factory.class", PropertiesDrivenCryptoFactory.class.getName()); + jsc.hadoopConfiguration().set("parquet.encryption.kms.client.class", InMemoryKMS.class.getName()); + jsc.hadoopConfiguration().set("hoodie.parquet.encryption.key.list", "k1:AAECAwQFBgcICQoLDA0ODw==, k2:AAECAAECAAECAAECAAECAA=="); + Assertions.assertEquals(100, spark.read().format("org.apache.hudi").load(basePath).count()); + Assertions.assertDoesNotThrow(() -> spark.read().format("org.apache.hudi").load(basePath).select("rider").show(1)); + } +} \ No newline at end of file