Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@

package org.apache.iceberg;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionAlgorithm;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EnvelopeConfig;
import org.apache.iceberg.encryption.EnvelopeEncryptionManager;
import org.apache.iceberg.encryption.EnvelopeEncryptionSpec;
import org.apache.iceberg.encryption.EnvelopeKeyManager;
import org.apache.iceberg.encryption.KmsClient;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.encryption.TableEnvelopeKeyManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -315,6 +325,98 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public static EncryptionManager createEncryptionManager(TableMetadata tableMetadata,
Map<String, String> extraKmsProperties) {
Map<String, String> tableProperties = tableMetadata.properties();

String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
TableProperties.ENCRYPTION_MANAGER_TYPE, null);
String tableKeyId = PropertyUtil.propertyAsString(tableProperties,
TableProperties.ENCRYPTION_TABLE_KEY, null);

boolean encryptedTable = false;

if (null == keyManagerType) {
if (null != tableKeyId) {
encryptedTable = true;
// Default: standard envelope encryption
keyManagerType = TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE;
}
} else {
if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
Preconditions.checkArgument(tableKeyId == null,
"Table encryption key set for unencrypted table");
} else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) {
Preconditions.checkArgument(tableKeyId == null,
"Table encryption key set for table encrypted with legacy encryption manager");
encryptedTable = true;
} else {
Preconditions.checkArgument(tableKeyId != null,
"Table encryption key is not set for encrypted table. " +
"Key manager type: " + keyManagerType);
encryptedTable = true;
}
}

if (!encryptedTable) {
return new PlaintextEncryptionManager();
}

if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) {
// TODO load custom EncryptionManager. Needed?
} else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)) {
Schema tableSchema = tableMetadata.schema();

boolean pushdown = PropertyUtil.propertyAsBoolean(tableProperties,
TableProperties.ENCRYPTION_PUSHDOWN_ENABLED, TableProperties.ENCRYPTION_PUSHDOWN_ENABLED_DEFAULT);

String dataEncryptionAlgorithm = PropertyUtil.propertyAsString(tableProperties,
TableProperties.ENCRYPTION_DATA_ALGORITHM, TableProperties.ENCRYPTION_DATA_ALGORITHM_DEFAULT);

EnvelopeConfig dataFileConfig = EnvelopeConfig.builderFor(tableSchema)
.singleWrap(tableKeyId)
.useAlgorithm(EncryptionAlgorithm.valueOf(dataEncryptionAlgorithm))
.build();

String kmsClientImpl = PropertyUtil.propertyAsString(tableProperties,
TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, null);

// Pass custom kms configuration from table and additional properties
Map<String, String> kmsProperties = new HashMap<>();
for (Map.Entry<String, String> property : tableProperties.entrySet()) {
if (property.getKey().contains("kms.client")) { // TODO
kmsProperties.put(property.getKey(), property.getValue());
}
}

for (Map.Entry<String, String> property : extraKmsProperties.entrySet()) {
if (property.getKey().contains("kms.client")) { // TODO
kmsProperties.put(property.getKey(), property.getValue());
}
}

KmsClient kmsClient = TableEnvelopeKeyManager.loadKmsClient(kmsClientImpl, kmsProperties);

int dataKeyLength = -1;
if (!kmsClient.supportsKeyGeneration()) {
dataKeyLength = PropertyUtil.propertyAsInt(tableProperties,
TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT);
}

EnvelopeKeyManager keyManager = new TableEnvelopeKeyManager(kmsClient, dataFileConfig, pushdown,
tableSchema, dataKeyLength);

// TODO add manifest/list encr specs. Post-MVP(?)
EnvelopeEncryptionSpec spec = EnvelopeEncryptionSpec.builderFor(tableSchema)
.addDataFileConfig(dataFileConfig).build();

return new EnvelopeEncryptionManager(pushdown, spec, keyManager);
}

throw new UnsupportedOperationException("Unsupported encryption manager type " + keyManagerType);
}

private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand Down Expand Up @@ -67,6 +70,7 @@ public class HadoopTableOperations implements TableOperations {
private volatile TableMetadata currentMetadata = null;
private volatile Integer version = null;
private volatile boolean shouldRefresh = true;
private volatile EncryptionManager encryptionManager = null;

protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf) {
this.conf = conf;
Expand All @@ -82,6 +86,23 @@ public TableMetadata current() {
return currentMetadata;
}

@Override
public EncryptionManager encryption() {
// TODO run by single thread? or synchronize?
if (null == encryptionManager) {
// get KMS client properties from Hadoop config
Map<String, String> extraKmsProperties = new HashMap<>();
for (Map.Entry<String, String> property : conf) {
if (property.getKey().contains("kms.client")) { // TODO
extraKmsProperties.put(property.getKey(), property.getValue());
}
}
encryptionManager = BaseMetastoreTableOperations.createEncryptionManager(current(), extraKmsProperties);
}

return encryptionManager;
}

private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
return Pair.of(version, currentMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

// TODO document this, or change the previous function
default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return null;
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -85,6 +86,22 @@ public GenericAppenderFactory setAll(Map<String, String> properties) {

@Override
public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile, null, fileFormat);
}

@Override
public FileAppender<Record> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
NativeFileCryptoParameters nativeEncryption = null;
if (outputFile.useNativeEncryption()) {
nativeEncryption = outputFile.nativeEncryptionParameters();
}

return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, fileFormat);
}

private FileAppender<Record> newAppender(OutputFile outputFile,
NativeFileCryptoParameters nativeEncryption,
FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (fileFormat) {
Expand All @@ -104,6 +121,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
.setAll(config)
.metricsConfig(metricsConfig)
.overwrite()
.encryption(nativeEncryption)
.build();

case ORC:
Expand All @@ -127,7 +145,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
public org.apache.iceberg.io.DataWriter<Record> newDataWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
return new org.apache.iceberg.io.DataWriter<>(
newAppender(file.encryptingOutputFile(), format), format,
newAppender(file, format), format,
file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}

Expand All @@ -139,6 +157,11 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile file,
Preconditions.checkNotNull(eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

NativeFileCryptoParameters nativeEncryption = null;
if (file.useNativeEncryption()) {
nativeEncryption = file.nativeEncryptionParameters();
}

MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (format) {
Expand All @@ -165,6 +188,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile file,
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.encryption(nativeEncryption)
.buildEqualityWriter();

default:
Expand All @@ -180,6 +204,10 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile file,
public PositionDeleteWriter<Record> newPosDeleteWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
NativeFileCryptoParameters nativeEncryption = null;
if (file.useNativeEncryption()) {
nativeEncryption = file.nativeEncryptionParameters();
}
try {
switch (format) {
case AVRO:
Expand All @@ -203,6 +231,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(EncryptedOutputFile file,
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.encryption(nativeEncryption)
.buildPositionWriter();

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
Expand Down Expand Up @@ -94,6 +95,22 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
return newAppender(outputFile, null, format);
}

@Override
public FileAppender<RowData> newAppender(EncryptedOutputFile outputFile, FileFormat format) {
NativeFileCryptoParameters nativeEncryption = null;
if (outputFile.useNativeEncryption()) {
nativeEncryption = outputFile.nativeEncryptionParameters();
}

return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, format);
}

private FileAppender<RowData> newAppender(OutputFile outputFile,
NativeFileCryptoParameters nativeEncryption,
FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
try {
switch (format) {
Expand Down Expand Up @@ -122,6 +139,7 @@ public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat forma
.metricsConfig(metricsConfig)
.schema(schema)
.overwrite()
.encryption(nativeEncryption)
.build();

default:
Expand All @@ -135,7 +153,7 @@ public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat forma
@Override
public DataWriter<RowData> newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
newAppender(file.encryptingOutputFile(), format), format,
newAppender(file, format), format,
file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}

Expand All @@ -148,6 +166,10 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(EncryptedOutputFile outpu
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
NativeFileCryptoParameters nativeEncryption = null;
if (outputFile.useNativeEncryption()) {
nativeEncryption = outputFile.nativeEncryptionParameters();
}
try {
switch (format) {
case AVRO:
Expand All @@ -173,6 +195,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(EncryptedOutputFile outpu
.withSpec(spec)
.withKeyMetadata(outputFile.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.encryption(nativeEncryption)
.buildEqualityWriter();

default:
Expand All @@ -188,6 +211,10 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(EncryptedOutputFile outpu
public PositionDeleteWriter<RowData> newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format,
StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
NativeFileCryptoParameters nativeEncryption = null;
if (outputFile.useNativeEncryption()) {
nativeEncryption = outputFile.nativeEncryptionParameters();
}
try {
switch (format) {
case AVRO:
Expand All @@ -213,6 +240,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(EncryptedOutputFile outp
.withSpec(spec)
.withKeyMetadata(outputFile.keyMetadata())
.transformPaths(path -> StringData.fromString(path.toString()))
.encryption(nativeEncryption)
.buildPositionWriter();

default:
Expand Down
Loading