diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 668701be0ae98..f126ee35efcca 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -489,6 +489,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.CastSuite", "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.execution.datasources.orc.OrcEncryptionSuite", "org.apache.spark.sql.hive.HiveExternalCatalogSuite", "org.apache.spark.sql.hive.StatisticsSuite", "org.apache.spark.sql.hive.client.VersionsSuite", diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/datasources/orc/FakeKeyProvider.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/datasources/orc/FakeKeyProvider.java new file mode 100644 index 0000000000000..c48543802eb33 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/execution/datasources/orc/FakeKeyProvider.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderFactory; +import org.apache.hadoop.crypto.key.kms.KMSClientProvider; + +/** + * A Hadoop KeyProvider that lets us test the interaction + * with the Hadoop code. + * + * https://github.com/apache/orc/blob/rel/release-1.6.6/java/tools/src/test/org/apache/orc/impl/FakeKeyProvider.java + * + * This file intentionally keeps the original file except + * (1) package name, (2) import order, (3) a few indentation + */ +public class FakeKeyProvider extends KeyProvider { + // map from key name to metadata + private final Map keyMetdata = new HashMap<>(); + // map from key version name to material + private final Map keyVersions = new HashMap<>(); + + public FakeKeyProvider(Configuration conf) { + super(conf); + } + + @Override + public KeyVersion getKeyVersion(String name) { + return keyVersions.get(name); + } + + @Override + public List getKeys() { + return new ArrayList<>(keyMetdata.keySet()); + } + + @Override + public List getKeyVersions(String name) { + List result = new ArrayList<>(); + Metadata meta = getMetadata(name); + for(int v=0; v < meta.getVersions(); ++v) { + String versionName = buildVersionName(name, v); + KeyVersion material = keyVersions.get(versionName); + if (material != null) { + result.add(material); + } + } + return result; + } + + @Override + public Metadata getMetadata(String name) { + return keyMetdata.get(name); + } + + @Override + public KeyVersion createKey(String name, byte[] bytes, Options options) { + String versionName = buildVersionName(name, 0); + keyMetdata.put(name, new TestMetadata(options.getCipher(), + options.getBitLength(), 1)); + KeyVersion result = new KMSClientProvider.KMSKeyVersion(name, versionName, bytes); + keyVersions.put(versionName, result); + return result; + } + + @Override + public void deleteKey(String name) { + throw new UnsupportedOperationException("Can't delete keys"); + } + + @Override + public KeyVersion rollNewVersion(String name, byte[] bytes) { + TestMetadata key = keyMetdata.get(name); + String versionName = buildVersionName(name, key.addVersion()); + KeyVersion result = new KMSClientProvider.KMSKeyVersion(name, versionName, + bytes); + keyVersions.put(versionName, result); + return result; + } + + @Override + public void flush() { + // Nothing + } + + static class TestMetadata extends KeyProvider.Metadata { + + TestMetadata(String cipher, int bitLength, int versions) { + super(cipher, bitLength, null, null, null, versions); + } + + public int addVersion() { + return super.addVersion(); + } + } + + public static class Factory extends KeyProviderFactory { + + @Override + public KeyProvider createProvider(URI uri, Configuration conf) throws IOException { + if ("test".equals(uri.getScheme())) { + KeyProvider provider = new FakeKeyProvider(conf); + // populate a couple keys into the provider + byte[] piiKey = new byte[]{0,1,2,3,4,5,6,7,8,9,0xa,0xb,0xc,0xd,0xe,0xf}; + org.apache.hadoop.crypto.key.KeyProvider.Options aes128 = new KeyProvider.Options(conf); + provider.createKey("pii", piiKey, aes128); + byte[] piiKey2 = new byte[]{0x10,0x11,0x12,0x13,0x14,0x15,0x16,0x17, + 0x18,0x19,0x1a,0x1b,0x1c,0x1d,0x1e,0x1f}; + provider.rollNewVersion("pii", piiKey2); + byte[] secretKey = new byte[]{0x20,0x21,0x22,0x23,0x24,0x25,0x26,0x27, + 0x28,0x29,0x2a,0x2b,0x2c,0x2d,0x2e,0x2f}; + provider.createKey("secret", secretKey, aes128); + return KeyProviderCryptoExtension.createKeyProviderCryptoExtension(provider); + } + return null; + } + } +} diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory b/sql/core/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory new file mode 100644 index 0000000000000..f436622b5fb42 --- /dev/null +++ b/sql/core/src/test/resources/META-INF/services/org.apache.hadoop.crypto.key.KeyProviderFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +test.org.apache.spark.sql.execution.datasources.orc.FakeKeyProvider$Factory diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcEncryptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcEncryptionSuite.scala new file mode 100644 index 0000000000000..fac3cef5801dd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcEncryptionSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.SharedSparkSession + +class OrcEncryptionSuite extends OrcTest with SharedSparkSession { + import testImplicits._ + + val originalData = Seq(("123456789", "dongjoon@apache.org", "Dongjoon Hyun")) + val rowDataWithoutKey = + Row(null, "841626795E7D351555B835A002E3BF10669DE9B81C95A3D59E10865AC37EA7C3", "Dongjoon Hyun") + + test("Write and read an encrypted file") { + val df = originalData.toDF("ssn", "email", "name") + + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf( + "hadoop.security.key.provider.path" -> "test:///", + "orc.key.provider" -> "hadoop", + "orc.encrypt" -> "pii:ssn,email", + "orc.mask" -> "nullify:ssn;sha256:email") { + df.write.mode("overwrite").orc(path) + checkAnswer(spark.read.orc(path), df) + } + + withSQLConf( + "orc.key.provider" -> "memory", + "orc.encrypt" -> "pii:ssn,email", + "orc.mask" -> "nullify:ssn;sha256:email") { + checkAnswer(spark.read.orc(path), rowDataWithoutKey) + } + } + } + + test("Write and read an encrypted table") { + val df = originalData.toDF("ssn", "email", "name") + + withTempPath { dir => + val path = dir.getAbsolutePath + withTable("encrypted") { + sql( + s""" + |CREATE TABLE encrypted ( + | ssn STRING, + | email STRING, + | name STRING + |) + |USING ORC + |LOCATION "$path" + |OPTIONS ( + | hadoop.security.key.provider.path "test:///", + | orc.key.provider "hadoop", + | orc.encrypt "pii:ssn,email", + | orc.mask "nullify:ssn;sha256:email" + |) + |""".stripMargin) + sql("INSERT INTO encrypted VALUES('123456789', 'dongjoon@apache.org', 'Dongjoon Hyun')") + checkAnswer(sql("SELECT * FROM encrypted"), df) + } + withTable("normal") { + sql( + s""" + |CREATE TABLE normal ( + | ssn STRING, + | email STRING, + | name STRING + |) + |USING ORC + |LOCATION "$path" + |OPTIONS ( + | orc.key.provider "memory", + | orc.encrypt "pii:ssn,email", + | orc.mask "nullify:ssn;sha256:email" + |) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM normal"), rowDataWithoutKey) + } + } + } +}