From 81f87d933a04496c25c77b713ecbea22b29dd201 Mon Sep 17 00:00:00 2001 From: mccheah Date: Thu, 28 Feb 2019 14:42:33 -0800 Subject: [PATCH 01/15] Integrate encryption into datasource * add write encryption codepath * add reader side * remove unnecessary field * add log warn * fix check * try a single iterator * new reader * addressing comments * remove unused struct --- .../main/java/com/netflix/iceberg/Table.java | 5 +- .../iceberg/BaseMetastoreTableOperations.java | 7 +++ .../java/com/netflix/iceberg/BaseTable.java | 6 +++ .../com/netflix/iceberg/BaseTransaction.java | 11 ++++ .../java/com/netflix/iceberg/DataFiles.java | 20 +++++++ .../com/netflix/iceberg/TableOperations.java | 5 +- .../encryption/BaseEncryptionKeyMetadata.java | 2 +- .../PlaintextEncryptionManager.java | 44 ++++++++++++++++ .../iceberg/hadoop/HadoopTableOperations.java | 7 +++ .../netflix/iceberg/LocalTableOperations.java | 7 +++ .../java/com/netflix/iceberg/TestTables.java | 7 +++ .../netflix/iceberg/spark/source/Reader.java | 52 +++++++++++++++---- .../netflix/iceberg/spark/source/Writer.java | 45 ++++++++++------ .../iceberg/spark/source/TestTables.java | 7 +++ 14 files changed, 191 insertions(+), 34 deletions(-) create mode 100644 core/src/main/java/com/netflix/iceberg/encryption/PlaintextEncryptionManager.java diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java index ccf3ab43ebe9..e0ce18958187 100644 --- a/api/src/main/java/com/netflix/iceberg/Table.java +++ b/api/src/main/java/com/netflix/iceberg/Table.java @@ -191,10 +191,7 @@ default AppendFiles newFastAppend() { * @return an {@link com.netflix.iceberg.encryption.EncryptionManager} to encrypt and decrypt * data files. */ - default EncryptionManager encryption() { - // TODO coming soon - throw new UnsupportedOperationException("Encryption is a work in progress."); - } + EncryptionManager encryption(); /** * @return a {@link LocationProvider} to provide locations for new data files diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java index 4cda7f8f53fc..bf5925f50129 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java @@ -20,6 +20,8 @@ package com.netflix.iceberg; import com.google.common.base.Objects; +import com.netflix.iceberg.encryption.EncryptionManager; +import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.hadoop.HadoopFileIO; import com.netflix.iceberg.io.FileIO; @@ -137,6 +139,11 @@ public FileIO io() { return fileIo; } + @Override + public EncryptionManager encryption() { + return new PlaintextEncryptionManager(); + } + @Override public LocationProvider locationProvider() { return LocationProviders.locationsFor(current().location(), current().properties()); diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java index 36b1832891af..16c7655a0e8d 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTable.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java @@ -19,6 +19,7 @@ package com.netflix.iceberg; +import com.netflix.iceberg.encryption.EncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.io.LocationProvider; import java.util.Map; @@ -147,6 +148,11 @@ public FileIO io() { return operations().io(); } + @Override + public EncryptionManager encryption() { + return operations().encryption(); + } + @Override public LocationProvider locationProvider() { return operations().locationProvider(); diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java index 2180a91a931c..5403f75a2345 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.netflix.iceberg.encryption.EncryptionManager; import com.netflix.iceberg.exceptions.CommitFailedException; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.io.LocationProvider; @@ -283,6 +284,11 @@ public FileIO io() { return ops.io(); } + @Override + public EncryptionManager encryption() { + return ops.encryption(); + } + @Override public String metadataFileLocation(String fileName) { return ops.metadataFileLocation(fileName); @@ -399,6 +405,11 @@ public FileIO io() { return transactionOps.io(); } + @Override + public EncryptionManager encryption() { + return transactionOps.encryption(); + } + @Override public LocationProvider locationProvider() { return transactionOps.locationProvider(); diff --git a/core/src/main/java/com/netflix/iceberg/DataFiles.java b/core/src/main/java/com/netflix/iceberg/DataFiles.java index 86925227b541..35f6a31749ba 100644 --- a/core/src/main/java/com/netflix/iceberg/DataFiles.java +++ b/core/src/main/java/com/netflix/iceberg/DataFiles.java @@ -136,6 +136,18 @@ public static DataFile fromInputFile(InputFile file, PartitionData partition, Me location, format, partition, file.getLength(), DEFAULT_BLOCK_SIZE, metrics); } + public static DataFile fromInputFile(InputFile file, PartitionData partition, Metrics metrics, + EncryptionKeyMetadata keyMetadata) { + if (file instanceof HadoopInputFile) { + return fromStat(((HadoopInputFile) file).getStat(), partition, metrics, keyMetadata); + } + + String location = file.location(); + FileFormat format = FileFormat.fromFileName(location); + return new GenericDataFile( + location, format, partition, file.getLength(), DEFAULT_BLOCK_SIZE, metrics, keyMetadata.keyMetadata()); + } + public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics) { String location = stat.getPath().toString(); FileFormat format = FileFormat.fromFileName(location); @@ -143,6 +155,14 @@ public static DataFile fromStat(FileStatus stat, PartitionData partition, Metric location, format, partition, stat.getLen(), stat.getBlockSize(), metrics); } + public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics, + EncryptionKeyMetadata keyMetadata) { + String location = stat.getPath().toString(); + FileFormat format = FileFormat.fromFileName(location); + return new GenericDataFile( + location, format, partition, stat.getLen(), stat.getBlockSize(), metrics, keyMetadata.keyMetadata()); + } + public static DataFile fromParquetInputFile(InputFile file, PartitionData partition, Metrics metrics) { diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java index e32c9a1a2460..779579f3d435 100644 --- a/core/src/main/java/com/netflix/iceberg/TableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java @@ -66,10 +66,7 @@ public interface TableOperations { * @return a {@link com.netflix.iceberg.encryption.EncryptionManager} to encrypt and decrypt * data files. */ - default EncryptionManager encryption() { - // TODO coming soon - throw new UnsupportedOperationException("Encryption is a work in progress."); - } + EncryptionManager encryption(); /** * Given the name of a metadata file, obtain the full path of that file using an appropriate base diff --git a/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java b/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java index e24749a80f34..c9a7126e92d1 100644 --- a/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java +++ b/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java @@ -38,6 +38,6 @@ public ByteBuffer keyMetadata() { @Override public EncryptionKeyMetadata copy() { return new BaseEncryptionKeyMetadata( - ByteBuffers.copy(keyMetadata)); + keyMetadata == null ? null : ByteBuffers.copy(keyMetadata)); } } diff --git a/core/src/main/java/com/netflix/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/com/netflix/iceberg/encryption/PlaintextEncryptionManager.java new file mode 100644 index 000000000000..642bb01aab0c --- /dev/null +++ b/core/src/main/java/com/netflix/iceberg/encryption/PlaintextEncryptionManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.encryption; + +import com.netflix.iceberg.io.InputFile; +import com.netflix.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +public class PlaintextEncryptionManager implements EncryptionManager { + private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + + @Override + public InputFile decrypt(EncryptedInputFile encrypted) { + if (encrypted.keyMetadata().keyMetadata() != null) { + LOG.warn("File encryption key metadata is present, but currently using PlaintextEncryptionManager."); + } + return encrypted.encryptedInputFile(); + } + + @Override + public EncryptedOutputFile encrypt(OutputFile rawOutput) { + return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null); + } +} diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 1f784a7cabca..27a2797e5b79 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import com.netflix.iceberg.LocationProviders; +import com.netflix.iceberg.encryption.EncryptionManager; +import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.TableMetadata; import com.netflix.iceberg.TableMetadataParser; @@ -160,6 +162,11 @@ public FileIO io() { return defaultFileIo; } + @Override + public EncryptionManager encryption() { + return new PlaintextEncryptionManager(); + } + @Override public LocationProvider locationProvider() { return LocationProviders.locationsFor(current().location(), current().properties()); diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java index e60bbaaa7269..6eb08e9c7fcd 100644 --- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java +++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java @@ -20,6 +20,8 @@ package com.netflix.iceberg; import com.google.common.collect.Maps; +import com.netflix.iceberg.encryption.EncryptionManager; +import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.io.FileIO; import java.util.Map; @@ -59,6 +61,11 @@ public FileIO io() { return io; } + @Override + public EncryptionManager encryption() { + return new PlaintextEncryptionManager(); + } + @Override public String metadataFileLocation(String fileName) { return createdMetadataFilePaths.computeIfAbsent(fileName, name -> { diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java index a0048e384bed..29a88df5aed6 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTables.java +++ b/core/src/test/java/com/netflix/iceberg/TestTables.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.netflix.iceberg.encryption.EncryptionManager; +import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.AlreadyExistsException; import com.netflix.iceberg.exceptions.CommitFailedException; import com.netflix.iceberg.exceptions.RuntimeIOException; @@ -181,6 +183,11 @@ public FileIO io() { return new LocalFileIO(); } + @Override + public EncryptionManager encryption() { + return new PlaintextEncryptionManager(); + } + @Override public LocationProvider locationProvider() { Preconditions.checkNotNull(current, diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java index 33b95c1730b3..c4550b2654d5 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java @@ -19,9 +19,14 @@ package com.netflix.iceberg.spark.source; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.netflix.iceberg.CombinedScanTask; import com.netflix.iceberg.DataFile; +import com.netflix.iceberg.encryption.EncryptedFiles; +import com.netflix.iceberg.encryption.EncryptedInputFile; +import com.netflix.iceberg.encryption.EncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.FileScanTask; import com.netflix.iceberg.PartitionField; @@ -66,14 +71,20 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static com.google.common.collect.Iterators.transform; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; @@ -83,11 +94,13 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics { + private static final Logger LOG = LoggerFactory.getLogger(Reader.class); private static final Filter[] NO_FILTERS = new Filter[0]; private final Table table; private final FileIO fileIo; + private final EncryptionManager encryptionManager; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; @@ -101,6 +114,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD this.table = table; this.schema = table.schema(); this.fileIo = table.io(); + this.encryptionManager = table.encryption(); } private Schema lazySchema() { @@ -133,7 +147,7 @@ public List> planInputPartitions() { List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { - readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo)); + readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager)); } return readTasks; @@ -227,21 +241,24 @@ private static class ReadTask implements InputPartition, Serializab private final String tableSchemaString; private final String expectedSchemaString; private final FileIO fileIo; + private final EncryptionManager encryptionManager; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; private ReadTask( - CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) { + CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo, + EncryptionManager encryptionManager) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; this.fileIo = fileIo; + this.encryptionManager = encryptionManager; } @Override public InputPartitionReader createPartitionReader() { - return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo); + return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo, encryptionManager); } private Schema lazyTableSchema() { @@ -265,21 +282,35 @@ private static class TaskDataReader implements InputPartitionReader .impl(UnsafeProjection.class, InternalRow.class) .build(); - private final Iterator tasks; private final Schema tableSchema; private final Schema expectedSchema; private final FileIO fileIo; + private final Iterator tasks; + private final Map inputFiles; + private Iterator currentIterator = null; private Closeable currentCloseable = null; private InternalRow current = null; - public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) { + public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, + EncryptionManager encryptionManager) { this.fileIo = fileIo; this.tasks = task.files().iterator(); this.tableSchema = tableSchema; this.expectedSchema = expectedSchema; // open last because the schemas and fileIo must be set + Iterable inputFiles = encryptionManager.decrypt(() -> task.files().stream() + .map(fileScanTask -> + EncryptedFiles.encryptedInput( + this.fileIo.newInputFile(fileScanTask.file().path().toString()), + fileScanTask.file().keyMetadata())) + .iterator()); + this.inputFiles = StreamSupport.stream(inputFiles.spliterator(), false) + .collect(Collectors.toMap( + inputFile -> inputFile.location(), + Function.identity())); + this.currentIterator = open(tasks.next()); } @@ -385,8 +416,9 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema } private Iterator open(FileScanTask task, Schema readSchema) { - InputFile location = fileIo.newInputFile(task.file().path().toString()); CloseableIterable iter; + InputFile location = inputFiles.get(task.file().path().toString()); + Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); switch (task.file().format()) { case PARQUET: iter = newParquetIterable(location, task, readSchema); @@ -406,10 +438,10 @@ private Iterator open(FileScanTask task, Schema readSchema) { return iter.iterator(); } - private CloseableIterable newAvroIterable(InputFile location, - FileScanTask task, - Schema readSchema) { - return Avro.read(location) + private CloseableIterable newAvroIterable(InputFile inputFile, + FileScanTask task, + Schema readSchema) { + return Avro.read(inputFile) .reuseContainers() .project(readSchema) .split(task.start(), task.length()) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index a1a296807ef9..fc49297e29fb 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -28,6 +28,9 @@ import com.netflix.iceberg.DataFile; import com.netflix.iceberg.DataFiles; import com.netflix.iceberg.FileFormat; +import com.netflix.iceberg.encryption.EncryptedOutputFile; +import com.netflix.iceberg.encryption.EncryptionKeyMetadata; +import com.netflix.iceberg.encryption.EncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.Metrics; import com.netflix.iceberg.PartitionSpec; @@ -77,17 +80,19 @@ class Writer implements DataSourceWriter { private final Table table; private final FileFormat format; private final FileIO fileIo; + private final EncryptionManager encryptionManager; Writer(Table table, FileFormat format) { this.table = table; this.format = format; this.fileIo = table.io(); + this.encryptionManager = table.encryption(); } @Override public DataWriterFactory createWriterFactory() { return new WriterFactory( - table.spec(), format, table.locationProvider(), table.properties(), fileIo); + table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); } @Override @@ -174,14 +179,16 @@ private static class WriterFactory implements DataWriterFactory { private final Map properties; private final String uuid = UUID.randomUUID().toString(); private final FileIO fileIo; + private final EncryptionManager encryptionManager; WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, - Map properties, FileIO fileIo) { + Map properties, FileIO fileIo, EncryptionManager encryptionManager) { this.spec = spec; this.format = format; this.locations = locations; this.properties = properties; this.fileIo = fileIo; + this.encryptionManager = encryptionManager; } @Override @@ -190,10 +197,13 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo AppenderFactory factory = new SparkAppenderFactory(); if (spec.fields().isEmpty()) { return new UnpartitionedWriter( - fileIo.newOutputFile(locations.newDataLocation(filename)), format, factory, fileIo); + fileIo.newOutputFile(locations.newDataLocation(filename)), format, factory, fileIo, encryptionManager); } else { - Function newOutputFileForKey = - key -> fileIo.newOutputFile(locations.newDataLocation(spec, key, filename)); + Function newOutputFileForKey = + key -> { + OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, filename)); + return encryptionManager.encrypt(rawOutputFile); + }; return new PartitionedWriter(spec, format, factory, newOutputFileForKey, fileIo); } } @@ -242,15 +252,19 @@ private static class UnpartitionedWriter implements DataWriter, Clo private final OutputFile file; private FileAppender appender = null; private Metrics metrics = null; + private final EncryptionKeyMetadata keyMetadata; UnpartitionedWriter( OutputFile file, FileFormat format, AppenderFactory factory, - FileIO fileIo) { - this.file = file; + FileIO fileIo, + EncryptionManager encryptionManager) { this.fileIo = fileIo; - this.appender = factory.newAppender(file, format); + EncryptedOutputFile encryptedOutput = encryptionManager.encrypt(file); + this.file = encryptedOutput.encryptingOutputFile(); + this.keyMetadata = encryptedOutput.keyMetadata(); + this.appender = factory.newAppender(this.file, format); } @Override @@ -269,7 +283,7 @@ public WriterCommitMessage commit() throws IOException { return new TaskCommit(); } - DataFile dataFile = DataFiles.fromInputFile(file.toInputFile(), null, metrics); + DataFile dataFile = DataFiles.fromInputFile(file.toInputFile(), null, metrics, keyMetadata); return new TaskCommit(dataFile); } @@ -298,19 +312,19 @@ private static class PartitionedWriter implements DataWriter { private final PartitionSpec spec; private final FileFormat format; private final AppenderFactory factory; - private final Function newOutputFileForKey; + private final Function newOutputFileForKey; private final PartitionKey key; private final FileIO fileIo; private PartitionKey currentKey = null; private FileAppender currentAppender = null; - private OutputFile currentFile = null; + private EncryptedOutputFile currentFile = null; PartitionedWriter( PartitionSpec spec, FileFormat format, AppenderFactory factory, - Function newOutputFileForKey, + Function newOutputFileForKey, FileIO fileIo) { this.spec = spec; this.format = format; @@ -336,7 +350,7 @@ public void write(InternalRow row) throws IOException { this.currentKey = key.copy(); this.currentFile = newOutputFileForKey.apply(currentKey); - this.currentAppender = factory.newAppender(currentFile, format); + this.currentAppender = factory.newAppender(currentFile.encryptingOutputFile(), format); } currentAppender.add(row); @@ -359,7 +373,7 @@ public void abort() throws IOException { if (currentAppender != null) { currentAppender.close(); this.currentAppender = null; - fileIo.deleteFile(currentFile); + fileIo.deleteFile(currentFile.encryptingOutputFile()); } } @@ -371,7 +385,8 @@ private void closeCurrent() throws IOException { this.currentAppender = null; DataFile dataFile = DataFiles.builder(spec) - .withInputFile(currentFile.toInputFile()) + .withInputFile(currentFile.encryptingOutputFile().toInputFile()) + .withEncryptionKeyMetadata(currentFile.keyMetadata()) .withPartition(currentKey) .withMetrics(metrics) .build(); diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java index 5a121a7cc1b4..3f303d90b093 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java @@ -22,6 +22,8 @@ import com.google.common.collect.Maps; import com.netflix.iceberg.BaseTable; import com.netflix.iceberg.LocationProviders; +import com.netflix.iceberg.encryption.EncryptionManager; +import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.Files; import com.netflix.iceberg.PartitionSpec; @@ -163,6 +165,11 @@ public FileIO io() { return new LocalFileIO(); } + @Override + public EncryptionManager encryption() { + return new PlaintextEncryptionManager(); + } + @Override public LocationProvider locationProvider() { Preconditions.checkNotNull(current, From f8ee9cd4ec1136717f163388c9d2a91daf91c2f6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 1 Mar 2019 17:45:54 -0800 Subject: [PATCH 02/15] Begin upgrading Spark --- build.gradle | 15 +++-- .../com/netflix/iceberg/RemoveSnapshots.java | 3 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../iceberg/pig/IcebergPigInputFormat.java | 2 +- .../spark/source/IcebergScanBuilder.java | 54 ++++++++++++++++++ .../iceberg/spark/source/IcebergSource.java | 15 ++++- .../spark/source/IcebergSparkTable.java | 57 +++++++++++++++++++ 7 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java create mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java diff --git a/build.gradle b/build.gradle index f6be71179d2f..9b8f1371e849 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,11 @@ */ buildscript { - repositories { jcenter() } + repositories { + jcenter() + maven { url "http://palantir.bintray.com/releases" } + } + dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.0' classpath 'com.netflix.nebula:gradle-aggregate-javadocs-plugin:2.2.+' @@ -51,6 +55,7 @@ subprojects { apply plugin: 'nebula.maven-base-publish' repositories { + maven { url "http://palantir.bintray.com/releases" } mavenCentral() mavenLocal() } @@ -66,16 +71,16 @@ subprojects { } ext { - hadoopVersion = '2.7.3' + hadoopVersion = '3.2.0-palantir.8' avroVersion = '1.8.2' orcVersion = '1.4.2' - parquetVersion = '1.10.0' + parquetVersion = '1.12.0-palantir.4' hiveVersion = '3.1.0' - jacksonVersion = '2.6.7' + jacksonVersion = '2.7.3' scalaVersion = '2.11' - sparkVersion = '2.4.0' + sparkVersion = '3.0.0-palantir.18' } sourceCompatibility = '1.8' diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java index 4784dd1f63db..0d1e5461775f 100644 --- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java @@ -26,7 +26,6 @@ import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.util.Tasks; import com.netflix.iceberg.util.ThreadPools; -import io.netty.util.internal.ConcurrentSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -155,7 +154,7 @@ public void commit() { } } - Set filesToDelete = new ConcurrentSet<>(); + Set filesToDelete = Sets.newConcurrentHashSet(); Tasks.foreach(allManifests) .noRetry().suppressFailureWhenFinished() .executeWith(ThreadPools.getWorkerPool()) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 976f7d27fb1f..510cee01eee1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -21,4 +21,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip diff --git a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java index 25cf1f99a999..68232daa0c86 100644 --- a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java +++ b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java @@ -36,7 +36,7 @@ import com.netflix.iceberg.types.Type; import com.netflix.iceberg.types.TypeUtil; import com.netflix.iceberg.types.Types; -import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java new file mode 100644 index 000000000000..70ade9db3704 --- /dev/null +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.source; + +import com.netflix.iceberg.Table; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.types.StructType; + +public class IcebergScanBuilder + implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns { + + private final Table table; + private Filter[] filters = new Filter[0]; + private StructType requiredSchema; + + @Override + public Scan build() { + return new Rea; + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + this.filters = filters; + return filters; + } + + @Override + public Filter[] pushedFilters() { + return filters; + } + + @Override + public void pruneColumns(StructType requiredSchema) { + this.requiredSchema = requiredSchema; + } +} diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 1991d29cbbe5..e6861d7bad2b 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; @@ -45,7 +46,9 @@ import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister { +public class IcebergSource implements + TableProvider, + DataSourceRegister { private SparkSession lazySpark = null; private Configuration lazyConf = null; @@ -136,4 +139,14 @@ private static void mergeIcebergHadoopConfs( .filter(key -> key.startsWith("iceberg.hadoop")) .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); } + + @Override + public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options) { + return null; + } + + @Override + public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) { + return null; + } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java new file mode 100644 index 000000000000..0c548cb4bae3 --- /dev/null +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.source; + +import com.netflix.iceberg.Table; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.SupportsBatchWrite; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { + + private final Table table; + + public IcebergSparkTable(Table table) { + this.table = table; + } + + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + } + + @Override + public WriteBuilder newWriteBuilder(DataSourceOptions options) { + return null; + } + + @Override + public String name() { + return null; + } + + @Override + public StructType schema() { + return null; + } +} From 0cbf39b102c5161964916561e8fe84ebd72cd6c9 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 16:12:19 -0800 Subject: [PATCH 03/15] Revert "Begin upgrading Spark" This reverts commit f8ee9cd4ec1136717f163388c9d2a91daf91c2f6. --- build.gradle | 15 ++--- .../com/netflix/iceberg/RemoveSnapshots.java | 3 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../iceberg/pig/IcebergPigInputFormat.java | 2 +- .../spark/source/IcebergScanBuilder.java | 54 ------------------ .../iceberg/spark/source/IcebergSource.java | 15 +---- .../spark/source/IcebergSparkTable.java | 57 ------------------- 7 files changed, 10 insertions(+), 138 deletions(-) delete mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java delete mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java diff --git a/build.gradle b/build.gradle index 9b8f1371e849..f6be71179d2f 100644 --- a/build.gradle +++ b/build.gradle @@ -18,11 +18,7 @@ */ buildscript { - repositories { - jcenter() - maven { url "http://palantir.bintray.com/releases" } - } - + repositories { jcenter() } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.0' classpath 'com.netflix.nebula:gradle-aggregate-javadocs-plugin:2.2.+' @@ -55,7 +51,6 @@ subprojects { apply plugin: 'nebula.maven-base-publish' repositories { - maven { url "http://palantir.bintray.com/releases" } mavenCentral() mavenLocal() } @@ -71,16 +66,16 @@ subprojects { } ext { - hadoopVersion = '3.2.0-palantir.8' + hadoopVersion = '2.7.3' avroVersion = '1.8.2' orcVersion = '1.4.2' - parquetVersion = '1.12.0-palantir.4' + parquetVersion = '1.10.0' hiveVersion = '3.1.0' - jacksonVersion = '2.7.3' + jacksonVersion = '2.6.7' scalaVersion = '2.11' - sparkVersion = '3.0.0-palantir.18' + sparkVersion = '2.4.0' } sourceCompatibility = '1.8' diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java index 0d1e5461775f..4784dd1f63db 100644 --- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java @@ -26,6 +26,7 @@ import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.util.Tasks; import com.netflix.iceberg.util.ThreadPools; +import io.netty.util.internal.ConcurrentSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -154,7 +155,7 @@ public void commit() { } } - Set filesToDelete = Sets.newConcurrentHashSet(); + Set filesToDelete = new ConcurrentSet<>(); Tasks.foreach(allManifests) .noRetry().suppressFailureWhenFinished() .executeWith(ThreadPools.getWorkerPool()) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 510cee01eee1..976f7d27fb1f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -21,4 +21,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip diff --git a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java index 68232daa0c86..25cf1f99a999 100644 --- a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java +++ b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java @@ -36,7 +36,7 @@ import com.netflix.iceberg.types.Type; import com.netflix.iceberg.types.TypeUtil; import com.netflix.iceberg.types.Types; -import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang.SerializationUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java deleted file mode 100644 index 70ade9db3704..000000000000 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.netflix.iceberg.spark.source; - -import com.netflix.iceberg.Table; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.reader.*; -import org.apache.spark.sql.types.StructType; - -public class IcebergScanBuilder - implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns { - - private final Table table; - private Filter[] filters = new Filter[0]; - private StructType requiredSchema; - - @Override - public Scan build() { - return new Rea; - } - - @Override - public Filter[] pushFilters(Filter[] filters) { - this.filters = filters; - return filters; - } - - @Override - public Filter[] pushedFilters() { - return filters; - } - - @Override - public void pruneColumns(StructType requiredSchema) { - this.requiredSchema = requiredSchema; - } -} diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index e6861d7bad2b..1991d29cbbe5 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -33,7 +33,6 @@ import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; @@ -46,9 +45,7 @@ import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -public class IcebergSource implements - TableProvider, - DataSourceRegister { +public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister { private SparkSession lazySpark = null; private Configuration lazyConf = null; @@ -139,14 +136,4 @@ private static void mergeIcebergHadoopConfs( .filter(key -> key.startsWith("iceberg.hadoop")) .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); } - - @Override - public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options) { - return null; - } - - @Override - public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) { - return null; - } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java deleted file mode 100644 index 0c548cb4bae3..000000000000 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.netflix.iceberg.spark.source; - -import com.netflix.iceberg.Table; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.SupportsBatchRead; -import org.apache.spark.sql.sources.v2.SupportsBatchWrite; -import org.apache.spark.sql.sources.v2.reader.Scan; -import org.apache.spark.sql.sources.v2.reader.ScanBuilder; -import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.types.StructType; - -public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { - - private final Table table; - - public IcebergSparkTable(Table table) { - this.table = table; - } - - @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { - } - - @Override - public WriteBuilder newWriteBuilder(DataSourceOptions options) { - return null; - } - - @Override - public String name() { - return null; - } - - @Override - public StructType schema() { - return null; - } -} From 019b796b7290d44d11b98274fe734b2b8147f94b Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 16:16:57 -0800 Subject: [PATCH 04/15] Revert "Revert "Begin upgrading Spark"" This reverts commit 0cbf39b102c5161964916561e8fe84ebd72cd6c9. --- build.gradle | 15 +++-- .../com/netflix/iceberg/RemoveSnapshots.java | 3 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../iceberg/pig/IcebergPigInputFormat.java | 2 +- .../spark/source/IcebergScanBuilder.java | 54 ++++++++++++++++++ .../iceberg/spark/source/IcebergSource.java | 15 ++++- .../spark/source/IcebergSparkTable.java | 57 +++++++++++++++++++ 7 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java create mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java diff --git a/build.gradle b/build.gradle index f6be71179d2f..9b8f1371e849 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,11 @@ */ buildscript { - repositories { jcenter() } + repositories { + jcenter() + maven { url "http://palantir.bintray.com/releases" } + } + dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.0' classpath 'com.netflix.nebula:gradle-aggregate-javadocs-plugin:2.2.+' @@ -51,6 +55,7 @@ subprojects { apply plugin: 'nebula.maven-base-publish' repositories { + maven { url "http://palantir.bintray.com/releases" } mavenCentral() mavenLocal() } @@ -66,16 +71,16 @@ subprojects { } ext { - hadoopVersion = '2.7.3' + hadoopVersion = '3.2.0-palantir.8' avroVersion = '1.8.2' orcVersion = '1.4.2' - parquetVersion = '1.10.0' + parquetVersion = '1.12.0-palantir.4' hiveVersion = '3.1.0' - jacksonVersion = '2.6.7' + jacksonVersion = '2.7.3' scalaVersion = '2.11' - sparkVersion = '2.4.0' + sparkVersion = '3.0.0-palantir.18' } sourceCompatibility = '1.8' diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java index 5c16e49146d0..7b4f57e3ca69 100644 --- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java @@ -26,7 +26,6 @@ import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.util.Tasks; import com.netflix.iceberg.util.ThreadPools; -import io.netty.util.internal.ConcurrentSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -162,7 +161,7 @@ public void commit() { } } - Set filesToDelete = new ConcurrentSet<>(); + Set filesToDelete = Sets.newConcurrentHashSet(); Tasks.foreach(allManifests) .noRetry().suppressFailureWhenFinished() .executeWith(ThreadPools.getWorkerPool()) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 976f7d27fb1f..510cee01eee1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -21,4 +21,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip diff --git a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java index 25cf1f99a999..68232daa0c86 100644 --- a/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java +++ b/pig/src/main/java/com/netflix/iceberg/pig/IcebergPigInputFormat.java @@ -36,7 +36,7 @@ import com.netflix.iceberg.types.Type; import com.netflix.iceberg.types.TypeUtil; import com.netflix.iceberg.types.Types; -import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java new file mode 100644 index 000000000000..70ade9db3704 --- /dev/null +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.source; + +import com.netflix.iceberg.Table; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.types.StructType; + +public class IcebergScanBuilder + implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns { + + private final Table table; + private Filter[] filters = new Filter[0]; + private StructType requiredSchema; + + @Override + public Scan build() { + return new Rea; + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + this.filters = filters; + return filters; + } + + @Override + public Filter[] pushedFilters() { + return filters; + } + + @Override + public void pruneColumns(StructType requiredSchema) { + this.requiredSchema = requiredSchema; + } +} diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 1991d29cbbe5..e6861d7bad2b 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; @@ -45,7 +46,9 @@ import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister { +public class IcebergSource implements + TableProvider, + DataSourceRegister { private SparkSession lazySpark = null; private Configuration lazyConf = null; @@ -136,4 +139,14 @@ private static void mergeIcebergHadoopConfs( .filter(key -> key.startsWith("iceberg.hadoop")) .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key))); } + + @Override + public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options) { + return null; + } + + @Override + public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) { + return null; + } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java new file mode 100644 index 000000000000..0c548cb4bae3 --- /dev/null +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.source; + +import com.netflix.iceberg.Table; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.SupportsBatchWrite; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { + + private final Table table; + + public IcebergSparkTable(Table table) { + this.table = table; + } + + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + } + + @Override + public WriteBuilder newWriteBuilder(DataSourceOptions options) { + return null; + } + + @Override + public String name() { + return null; + } + + @Override + public StructType schema() { + return null; + } +} From 9b2f3e01924bf33cbed57317a08e0b061b865d54 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 17:59:33 -0800 Subject: [PATCH 05/15] writer implementation migrated --- .../iceberg/spark/source/IcebergSource.java | 109 ++++++++++-------- .../spark/source/IcebergSparkTable.java | 50 +++++++- .../netflix/iceberg/spark/source/Writer.java | 17 ++- 3 files changed, 117 insertions(+), 59 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index e6861d7bad2b..c66c22782f15 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -20,32 +20,22 @@ package com.netflix.iceberg.spark.source; import com.google.common.base.Preconditions; -import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.spark.SparkSchemaUtil; import com.netflix.iceberg.types.CheckCompatibility; import org.apache.hadoop.conf.Configuration; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.TableProvider; -import org.apache.spark.sql.sources.v2.WriteSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; + import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; -import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - public class IcebergSource implements TableProvider, DataSourceRegister { @@ -58,44 +48,44 @@ public String shortName() { return "iceberg"; } - @Override - public DataSourceReader createReader(DataSourceOptions options) { - Configuration conf = new Configuration(lazyBaseConf()); - Table table = getTableAndResolveHadoopConfiguration(options, conf); - return new Reader(table); - } - - @Override - public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, - DataSourceOptions options) { - Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); - Configuration conf = new Configuration(lazyBaseConf()); - Table table = getTableAndResolveHadoopConfiguration(options, conf); - - Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); - List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); - if (!errors.isEmpty()) { - StringBuilder sb = new StringBuilder(); - sb.append("Cannot write incompatible dataframe to table with schema:\n") - .append(table.schema()).append("\nProblems:"); - for (String error : errors) { - sb.append("\n* ").append(error); - } - throw new IllegalArgumentException(sb.toString()); - } - - Optional formatOption = options.get("iceberg.write.format"); - FileFormat format; - if (formatOption.isPresent()) { - format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH)); - } else { - format = FileFormat.valueOf(table.properties() - .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) - .toUpperCase(Locale.ENGLISH)); - } - - return Optional.of(new Writer(table, format)); - } +// @Override +// public DataSourceReader createReader(DataSourceOptions options) { +// Configuration conf = new Configuration(lazyBaseConf()); +// Table table = getTableAndResolveHadoopConfiguration(options, conf); +// return new Reader(table); +// } +// +// @Override +// public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, +// DataSourceOptions options) { +// Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); +// Configuration conf = new Configuration(lazyBaseConf()); +// Table table = getTableAndResolveHadoopConfiguration(options, conf); +// +// Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); +// List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); +// if (!errors.isEmpty()) { +// StringBuilder sb = new StringBuilder(); +// sb.append("Cannot write incompatible dataframe to table with schema:\n") +// .append(table.schema()).append("\nProblems:"); +// for (String error : errors) { +// sb.append("\n* ").append(error); +// } +// throw new IllegalArgumentException(sb.toString()); +// } +// +// Optional formatOption = options.get("iceberg.write.format"); +// FileFormat format; +// if (formatOption.isPresent()) { +// format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH)); +// } else { +// format = FileFormat.valueOf(table.properties() +// .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) +// .toUpperCase(Locale.ENGLISH)); +// } +// +// return Optional.of(new Writer(table, format)); +// } protected Table findTable(DataSourceOptions options, Configuration conf) { Optional location = options.get("path"); @@ -142,11 +132,28 @@ private static void mergeIcebergHadoopConfs( @Override public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options) { - return null; + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + return new IcebergSparkTable(table); } @Override public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) { - return null; + Configuration conf = new Configuration(lazyBaseConf()); + Table table = getTableAndResolveHadoopConfiguration(options, conf); + + Schema dfSchema = SparkSchemaUtil.convert(table.schema(), schema); + List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); + if (!errors.isEmpty()) { + StringBuilder sb = new StringBuilder(); + sb.append("Cannot write incompatible dataframe to table with schema:\n") + .append(table.schema()).append("\nProblems:"); + for (String error : errors) { + sb.append("\n* ").append(error); + } + throw new IllegalArgumentException(sb.toString()); + } + + return new IcebergSparkTable(table); } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index 0c548cb4bae3..0d3e66a55048 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -19,15 +19,22 @@ package com.netflix.iceberg.spark.source; +import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Table; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.SupportsBatchRead; import org.apache.spark.sql.sources.v2.SupportsBatchWrite; -import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; import org.apache.spark.sql.types.StructType; +import java.util.Locale; +import java.util.Optional; + +import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { private final Table table; @@ -38,11 +45,19 @@ public IcebergSparkTable(Table table) { @Override public ScanBuilder newScanBuilder(DataSourceOptions options) { + + } @Override public WriteBuilder newWriteBuilder(DataSourceOptions options) { - return null; + IcebergWriterBuilder writerBuilder = new IcebergWriterBuilder(table); + + Optional formatOption = options.get("iceberg.write.format"); + if (formatOption.isPresent()) { + writerBuilder = writerBuilder.setFileFormat(formatOption.get()); + } + return writerBuilder; } @Override @@ -54,4 +69,35 @@ public String name() { public StructType schema() { return null; } + + + private static class IcebergWriterBuilder implements WriteBuilder { + private final Table table; + private final Optional fileFormat; + + public IcebergWriterBuilder(Table table) { + this.table = table; + this.fileFormat = Optional.empty(); + } + + public IcebergWriterBuilder(Table table, FileFormat fileFormat) { + this.table = table; + this.fileFormat = Optional.of(fileFormat); + } + + public IcebergWriterBuilder setFileFormat(String format) { + return new IcebergWriterBuilder(this.table, + FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH))); + } + + @Override + public BatchWrite buildForBatch() { + fileFormat.ifPresent(fileFormat1 -> new Writer(table, fileFormat1)); + return new Writer(table, + FileFormat.valueOf(table.properties() + .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) + .toUpperCase(Locale.ENGLISH))); + } + + } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index e8c03624e1d9..be98c28c07bf 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -45,14 +45,13 @@ import com.netflix.iceberg.util.Tasks; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -74,7 +73,7 @@ import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage -class Writer implements DataSourceWriter { +class Writer implements BatchWrite { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); private final Table table; @@ -90,11 +89,17 @@ class Writer implements DataSourceWriter { } @Override - public DataWriterFactory createWriterFactory() { + public DataWriterFactory createBatchWriterFactory() { return new WriterFactory( table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); } +// @Override +// public DataWriterFactory createWriterFactory() { +// return new WriterFactory( +// table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); +// } + @Override public void commit(WriterCommitMessage[] messages) { AppendFiles append = table.newAppend(); @@ -172,7 +177,7 @@ DataFile[] files() { } } - private static class WriterFactory implements DataWriterFactory { + private static class WriterFactory implements DataWriterFactory { private final PartitionSpec spec; private final FileFormat format; private final LocationProvider locations; @@ -192,7 +197,7 @@ private static class WriterFactory implements DataWriterFactory { } @Override - public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { + public DataWriter createWriter(int partitionId, long taskId) { String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid)); AppenderFactory factory = new SparkAppenderFactory(); if (spec.fields().isEmpty()) { From cd76f66f8d03d61ca1f6212fc11c236820c9bc66 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 18:58:59 -0800 Subject: [PATCH 06/15] read side --- .../iceberg/spark/source/IcebergSource.java | 39 ------ .../spark/source/IcebergSparkTable.java | 62 ++++++++- .../netflix/iceberg/spark/source/Reader.java | 118 ++++++------------ 3 files changed, 94 insertions(+), 125 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index c66c22782f15..1eb8a18c6020 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -48,45 +48,6 @@ public String shortName() { return "iceberg"; } -// @Override -// public DataSourceReader createReader(DataSourceOptions options) { -// Configuration conf = new Configuration(lazyBaseConf()); -// Table table = getTableAndResolveHadoopConfiguration(options, conf); -// return new Reader(table); -// } -// -// @Override -// public Optional createWriter(String jobId, StructType dfStruct, SaveMode mode, -// DataSourceOptions options) { -// Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); -// Configuration conf = new Configuration(lazyBaseConf()); -// Table table = getTableAndResolveHadoopConfiguration(options, conf); -// -// Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct); -// List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); -// if (!errors.isEmpty()) { -// StringBuilder sb = new StringBuilder(); -// sb.append("Cannot write incompatible dataframe to table with schema:\n") -// .append(table.schema()).append("\nProblems:"); -// for (String error : errors) { -// sb.append("\n* ").append(error); -// } -// throw new IllegalArgumentException(sb.toString()); -// } -// -// Optional formatOption = options.get("iceberg.write.format"); -// FileFormat format; -// if (formatOption.isPresent()) { -// format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH)); -// } else { -// format = FileFormat.valueOf(table.properties() -// .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) -// .toUpperCase(Locale.ENGLISH)); -// } -// -// return Optional.of(new Writer(table, format)); -// } - protected Table findTable(DataSourceOptions options, Configuration conf) { Optional location = options.get("path"); Preconditions.checkArgument(location.isPresent(), diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index 0d3e66a55048..b8da9b4f14a5 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -19,16 +19,21 @@ package com.netflix.iceberg.spark.source; +import com.google.common.collect.Lists; import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Table; +import com.netflix.iceberg.expressions.Expression; +import com.netflix.iceberg.spark.SparkFilters; +import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.SupportsBatchRead; import org.apache.spark.sql.sources.v2.SupportsBatchWrite; -import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.sources.v2.writer.BatchWrite; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; import org.apache.spark.sql.types.StructType; +import java.util.List; import java.util.Locale; import java.util.Optional; @@ -45,8 +50,7 @@ public IcebergSparkTable(Table table) { @Override public ScanBuilder newScanBuilder(DataSourceOptions options) { - - + return new IcebergReaderBuilder(table); } @Override @@ -100,4 +104,56 @@ public BatchWrite buildForBatch() { } } + + private static class IcebergReaderBuilder implements ScanBuilder, + SupportsPushDownFilters, + SupportsPushDownRequiredColumns { + + private static final Filter[] NO_FILTERS = new Filter[0]; + + private final Table table; + private Filter[] pushedFilters = NO_FILTERS; + private List filterExpressions = Lists.newArrayList(); + private StructType requestedSchema = null; + + public IcebergReaderBuilder(Table table) { + this.table = table; + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + List expressions = Lists.newArrayListWithExpectedSize(filters.length); + List pushed = Lists.newArrayListWithExpectedSize(filters.length); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + expressions.add(expr); + pushed.add(filter); + } + } + + this.filterExpressions = expressions; + this.pushedFilters = pushed.toArray(new Filter[0]); + + // Spark doesn't support residuals per task, so return all filters + // to get Spark to handle record-level filtering + return filters; + } + + @Override + public Filter[] pushedFilters() { + return pushedFilters; + } + + @Override + public void pruneColumns(StructType requiredSchema) { + this.requestedSchema = requiredSchema; + } + + @Override + public Scan build() { + return new Reader(table, filterExpressions, requestedSchema); + } + } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java index 56cfdf2e8e27..a4b17f72d94b 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java @@ -43,7 +43,6 @@ import com.netflix.iceberg.io.CloseableIterable; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.parquet.Parquet; -import com.netflix.iceberg.spark.SparkFilters; import com.netflix.iceberg.spark.SparkSchemaUtil; import com.netflix.iceberg.spark.data.SparkAvroReader; import com.netflix.iceberg.spark.data.SparkParquetReaders; @@ -55,14 +54,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.apache.spark.sql.sources.v2.reader.Statistics; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; -import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -75,15 +67,12 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static com.google.common.collect.Iterators.transform; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; @@ -91,29 +80,27 @@ import static scala.collection.JavaConverters.asScalaBufferConverter; import static scala.collection.JavaConverters.seqAsJavaListConverter; -class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns, - SupportsReportStatistics { +class Reader implements Scan, Batch, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(Reader.class); - private static final Filter[] NO_FILTERS = new Filter[0]; - private final Table table; private final FileIO fileIo; private final EncryptionManager encryptionManager; - private StructType requestedSchema = null; - private List filterExpressions = null; - private Filter[] pushedFilters = NO_FILTERS; + private final StructType requestedSchema; + private final List filterExpressions; // lazy variables private Schema schema = null; private StructType type = null; // cached because Spark accesses it multiple times private List tasks = null; // lazy cache of tasks - Reader(Table table) { + Reader(Table table, List filteredExpressions, StructType requestedSchema) { this.table = table; this.schema = table.schema(); this.fileIo = table.io(); this.encryptionManager = table.encryption(); + this.filterExpressions = filteredExpressions; + this.requestedSchema = requestedSchema; } private Schema lazySchema() { @@ -140,57 +127,21 @@ public StructType readSchema() { } @Override - public List> planInputPartitions() { - String tableSchemaString = SchemaParser.toJson(table.schema()); - String expectedSchemaString = SchemaParser.toJson(lazySchema()); - - List> readTasks = Lists.newArrayList(); - for (CombinedScanTask task : tasks()) { - readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager)); - } - - return readTasks; + public InputPartition[] planInputPartitions() { + return tasks().stream().map(ReadTask::new).toArray(ReadTask[]::new); } @Override - public Filter[] pushFilters(Filter[] filters) { - this.tasks = null; // invalidate cached tasks, if present - - List expressions = Lists.newArrayListWithExpectedSize(filters.length); - List pushed = Lists.newArrayListWithExpectedSize(filters.length); - - for (Filter filter : filters) { - Expression expr = SparkFilters.convert(filter); - if (expr != null) { - expressions.add(expr); - pushed.add(filter); - } - } - - this.filterExpressions = expressions; - this.pushedFilters = pushed.toArray(new Filter[0]); - - // invalidate the schema that will be projected - this.schema = null; - this.type = null; - - // Spark doesn't support residuals per task, so return all filters - // to get Spark to handle record-level filtering - return filters; - } + public PartitionReaderFactory createReaderFactory() { + String tableSchemaString = SchemaParser.toJson(table.schema()); + String expectedSchemaString = SchemaParser.toJson(lazySchema()); - @Override - public Filter[] pushedFilters() { - return pushedFilters; + return new TaskDataReaderFactory(tableSchemaString, expectedSchemaString, fileIo, encryptionManager); } @Override - public void pruneColumns(StructType requestedSchema) { - this.requestedSchema = requestedSchema; - - // invalidate the schema that will be projected - this.schema = null; - this.type = null; + public Batch toBatch() { + return this; } @Override @@ -235,8 +186,19 @@ public String toString() { table, lazySchema().asStruct(), filterExpressions); } - private static class ReadTask implements InputPartition, Serializable { + private static class ReadTask implements InputPartition { private final CombinedScanTask task; + + public ReadTask(CombinedScanTask task) { + this.task = task; + } + + public CombinedScanTask getTask() { + return task; + } + } + + private static class TaskDataReaderFactory implements PartitionReaderFactory { private final String tableSchemaString; private final String expectedSchemaString; private final FileIO fileIo; @@ -245,10 +207,9 @@ private static class ReadTask implements InputPartition, Serializab private transient Schema tableSchema = null; private transient Schema expectedSchema = null; - private ReadTask( - CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo, + private TaskDataReaderFactory( + String tableSchemaString, String expectedSchemaString, FileIO fileIo, EncryptionManager encryptionManager) { - this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; this.fileIo = fileIo; @@ -256,8 +217,11 @@ private ReadTask( } @Override - public InputPartitionReader createPartitionReader() { - return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo, encryptionManager); + public PartitionReader createReader(InputPartition inputPartition) { + Preconditions.checkArgument(inputPartition instanceof ReadTask, + "InputPartition not an instance of ReadTask"); + ReadTask task = (ReadTask) inputPartition; + return new TaskDataReader(task.getTask(), lazyTableSchema(), lazyExpectedSchema(), fileIo, encryptionManager); } private Schema lazyTableSchema() { @@ -275,7 +239,7 @@ private Schema lazyExpectedSchema() { } } - private static class TaskDataReader implements InputPartitionReader { + private static class TaskDataReader implements PartitionReader { // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -287,7 +251,6 @@ private static class TaskDataReader implements InputPartitionReader private final Map inputFiles; private final Iterator tasks; - private final Map inputFiles; private Iterator currentIterator = null; private Closeable currentCloseable = null; @@ -308,17 +271,6 @@ public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expected decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted)); this.inputFiles = inputFileBuilder.build(); // open last because the schemas and fileIo must be set - Iterable inputFiles = encryptionManager.decrypt(() -> task.files().stream() - .map(fileScanTask -> - EncryptedFiles.encryptedInput( - this.fileIo.newInputFile(fileScanTask.file().path().toString()), - fileScanTask.file().keyMetadata())) - .iterator()); - this.inputFiles = StreamSupport.stream(inputFiles.spliterator(), false) - .collect(Collectors.toMap( - inputFile -> inputFile.location(), - Function.identity())); - this.currentIterator = open(tasks.next()); } From f3bfada447db0b62bdd4cf6839fd99e2aa9c70a7 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 19:08:09 -0800 Subject: [PATCH 07/15] simplify writer builder --- .../spark/source/IcebergSparkTable.java | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index b8da9b4f14a5..c5a0ac8a761b 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -77,30 +77,22 @@ public StructType schema() { private static class IcebergWriterBuilder implements WriteBuilder { private final Table table; - private final Optional fileFormat; + private FileFormat fileFormat; public IcebergWriterBuilder(Table table) { this.table = table; - this.fileFormat = Optional.empty(); + this.fileFormat = FileFormat.valueOf(table.properties() + .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) + .toUpperCase(Locale.ENGLISH)); } - public IcebergWriterBuilder(Table table, FileFormat fileFormat) { - this.table = table; - this.fileFormat = Optional.of(fileFormat); - } - - public IcebergWriterBuilder setFileFormat(String format) { - return new IcebergWriterBuilder(this.table, - FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH))); + public void setFileFormat(String format) { + fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); } @Override public BatchWrite buildForBatch() { - fileFormat.ifPresent(fileFormat1 -> new Writer(table, fileFormat1)); - return new Writer(table, - FileFormat.valueOf(table.properties() - .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) - .toUpperCase(Locale.ENGLISH))); + return new Writer(table, fileFormat); } } From a4865367f370838a6582a668223e25be4d36aa19 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 7 Mar 2019 19:08:54 -0800 Subject: [PATCH 08/15] oops --- .../com/netflix/iceberg/spark/source/IcebergSparkTable.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index c5a0ac8a761b..2e7ca4634cd6 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -58,9 +58,7 @@ public WriteBuilder newWriteBuilder(DataSourceOptions options) { IcebergWriterBuilder writerBuilder = new IcebergWriterBuilder(table); Optional formatOption = options.get("iceberg.write.format"); - if (formatOption.isPresent()) { - writerBuilder = writerBuilder.setFileFormat(formatOption.get()); - } + formatOption.ifPresent(writerBuilder::setFileFormat); return writerBuilder; } From e74ed5ea7f0a1e5288e8fcac71cea75a7cfef577 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 8 Mar 2019 14:46:53 -0800 Subject: [PATCH 09/15] welp everything works except parquet avro correctness --- .../iceberg/parquet/ParquetWriter.java | 5 +- .../spark/source/IcebergScanBuilder.java | 54 ------ .../iceberg/spark/source/IcebergSource.java | 17 +- .../spark/source/IcebergSparkTable.java | 172 +++++++++--------- .../spark/data/TestSparkDateTimes.java | 9 +- .../spark/source/TestFilteredScan.java | 99 +++++----- 6 files changed, 150 insertions(+), 206 deletions(-) delete mode 100644 spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java index ee7ee8ff5e00..de66ba969af3 100644 --- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java @@ -52,7 +52,8 @@ class ParquetWriter implements FileAppender, Closeable { .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore", CodecFactory.BytesCompressor.class, MessageType.class, - ByteBufferAllocator.class) + ByteBufferAllocator.class, + int.class) .build(); private static final DynMethods.UnboundMethod flushToWriter = DynMethods @@ -159,7 +160,7 @@ private void startRowGroup() { this.recordCount = 0; PageWriteStore pageStore = pageStoreCtor.newInstance( - compressor, parquetSchema, props.getAllocator()); + compressor, parquetSchema, props.getAllocator(), 0); this.flushPageStoreToWriter = flushToWriter.bind(pageStore); this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore); diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java deleted file mode 100644 index 70ade9db3704..000000000000 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergScanBuilder.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.netflix.iceberg.spark.source; - -import com.netflix.iceberg.Table; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.reader.*; -import org.apache.spark.sql.types.StructType; - -public class IcebergScanBuilder - implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns { - - private final Table table; - private Filter[] filters = new Filter[0]; - private StructType requiredSchema; - - @Override - public Scan build() { - return new Rea; - } - - @Override - public Filter[] pushFilters(Filter[] filters) { - this.filters = filters; - return filters; - } - - @Override - public Filter[] pushedFilters() { - return filters; - } - - @Override - public void pruneColumns(StructType requiredSchema) { - this.requiredSchema = requiredSchema; - } -} diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 1eb8a18c6020..e2e85c9e5fd0 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -100,21 +100,6 @@ public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options) @Override public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) { - Configuration conf = new Configuration(lazyBaseConf()); - Table table = getTableAndResolveHadoopConfiguration(options, conf); - - Schema dfSchema = SparkSchemaUtil.convert(table.schema(), schema); - List errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema); - if (!errors.isEmpty()) { - StringBuilder sb = new StringBuilder(); - sb.append("Cannot write incompatible dataframe to table with schema:\n") - .append(table.schema()).append("\nProblems:"); - for (String error : errors) { - sb.append("\n* ").append(error); - } - throw new IllegalArgumentException(sb.toString()); - } - - return new IcebergSparkTable(table); + throw new UnsupportedOperationException("Schema should never be passed into an iceberg table"); } } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index 2e7ca4634cd6..ea2126c9584a 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -19,17 +19,21 @@ package com.netflix.iceberg.spark.source; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Table; import com.netflix.iceberg.expressions.Expression; import com.netflix.iceberg.spark.SparkFilters; +import com.netflix.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.SupportsBatchRead; import org.apache.spark.sql.sources.v2.SupportsBatchWrite; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; import org.apache.spark.sql.types.StructType; @@ -40,110 +44,116 @@ import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { +public class IcebergSparkTable implements SupportsBatchRead, SupportsBatchWrite { + private final Table table; + + public IcebergSparkTable(Table table) { + this.table = table; + } + + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + return new IcebergReaderBuilder(table); + } + + @Override + public WriteBuilder newWriteBuilder(DataSourceOptions options) { + IcebergWriterBuilder writerBuilder = new IcebergWriterBuilder(table); + + Optional formatOption = options.get("iceberg.write.format"); + formatOption.ifPresent(writerBuilder::setFileFormat); + return writerBuilder; + } + + @Override + public String name() { + return table.location(); + } + + @Override + public StructType schema() { + return SparkSchemaUtil.convert(table.schema()); + } + + + private static class IcebergWriterBuilder implements WriteBuilder, + SupportsSaveMode { private final Table table; + private FileFormat fileFormat; - public IcebergSparkTable(Table table) { - this.table = table; + public IcebergWriterBuilder(Table table) { + this.table = table; + this.fileFormat = FileFormat.valueOf(table.properties() + .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) + .toUpperCase(Locale.ENGLISH)); } - @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { - return new IcebergReaderBuilder(table); + public void setFileFormat(String format) { + fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); } @Override - public WriteBuilder newWriteBuilder(DataSourceOptions options) { - IcebergWriterBuilder writerBuilder = new IcebergWriterBuilder(table); - - Optional formatOption = options.get("iceberg.write.format"); - formatOption.ifPresent(writerBuilder::setFileFormat); - return writerBuilder; + public BatchWrite buildForBatch() { + return new Writer(table, fileFormat); } @Override - public String name() { - return null; + public WriteBuilder mode(SaveMode mode) { + Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode); + return this; } + } - @Override - public StructType schema() { - return null; - } + private static class IcebergReaderBuilder implements ScanBuilder, + SupportsPushDownFilters, + SupportsPushDownRequiredColumns { + private static final Filter[] NO_FILTERS = new Filter[0]; - private static class IcebergWriterBuilder implements WriteBuilder { - private final Table table; - private FileFormat fileFormat; + private final Table table; + private Filter[] pushedFilters = NO_FILTERS; + private List filterExpressions = Lists.newArrayList(); + private StructType requestedSchema = null; - public IcebergWriterBuilder(Table table) { - this.table = table; - this.fileFormat = FileFormat.valueOf(table.properties() - .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) - .toUpperCase(Locale.ENGLISH)); - } + public IcebergReaderBuilder(Table table) { + this.table = table; + } - public void setFileFormat(String format) { - fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + @Override + public Filter[] pushFilters(Filter[] filters) { + List expressions = Lists.newArrayListWithExpectedSize(filters.length); + List pushed = Lists.newArrayListWithExpectedSize(filters.length); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + expressions.add(expr); + pushed.add(filter); } + } - @Override - public BatchWrite buildForBatch() { - return new Writer(table, fileFormat); - } + this.filterExpressions = expressions; + this.pushedFilters = pushed.toArray(new Filter[0]); + // Spark doesn't support residuals per task, so return all filters + // to get Spark to handle record-level filtering + return filters; } - private static class IcebergReaderBuilder implements ScanBuilder, - SupportsPushDownFilters, - SupportsPushDownRequiredColumns { - - private static final Filter[] NO_FILTERS = new Filter[0]; - - private final Table table; - private Filter[] pushedFilters = NO_FILTERS; - private List filterExpressions = Lists.newArrayList(); - private StructType requestedSchema = null; - - public IcebergReaderBuilder(Table table) { - this.table = table; - } - - @Override - public Filter[] pushFilters(Filter[] filters) { - List expressions = Lists.newArrayListWithExpectedSize(filters.length); - List pushed = Lists.newArrayListWithExpectedSize(filters.length); - - for (Filter filter : filters) { - Expression expr = SparkFilters.convert(filter); - if (expr != null) { - expressions.add(expr); - pushed.add(filter); - } - } - - this.filterExpressions = expressions; - this.pushedFilters = pushed.toArray(new Filter[0]); - - // Spark doesn't support residuals per task, so return all filters - // to get Spark to handle record-level filtering - return filters; - } - - @Override - public Filter[] pushedFilters() { - return pushedFilters; - } + @Override + public Filter[] pushedFilters() { + return pushedFilters; + } - @Override - public void pruneColumns(StructType requiredSchema) { - this.requestedSchema = requiredSchema; - } + @Override + public void pruneColumns(StructType requiredSchema) { + this.requestedSchema = requiredSchema; + } - @Override - public Scan build() { - return new Reader(table, filterExpressions, requestedSchema); - } + @Override + public Scan build() { + return new Reader(table, filterExpressions, requestedSchema); } + } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java index 72a004488e13..73a0becb9b6a 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java @@ -22,8 +22,13 @@ import com.netflix.iceberg.expressions.Literal; import com.netflix.iceberg.types.Types; import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter; +import org.apache.spark.sql.catalyst.util.TimestampFormatter; +import org.apache.spark.sql.catalyst.util.TimestampFormatter$; import org.junit.Assert; import org.junit.Test; + +import java.text.SimpleDateFormat; import java.util.TimeZone; public class TestSparkDateTimes { @@ -65,7 +70,9 @@ public void testSparkTimestamp() { public void checkSparkTimestamp(String timestampString, String sparkRepr) { Literal ts = Literal.of(timestampString).to(Types.TimestampType.withZone()); - String sparkTimestamp = DateTimeUtils.timestampToString(ts.value()); + String sparkTimestamp = DateTimeUtils.timestampToString( + TimestampFormatter$.MODULE$.apply(DateTimeUtils.defaultTimeZone()), + ts.value()); System.err.println(timestampString + ": " + ts.value()); Assert.assertEquals("Should be the same timestamp (" + ts.value() + ")", sparkRepr, sparkTimestamp); diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java index e0c3fa17dcf3..b0c330cb746e 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java @@ -53,10 +53,9 @@ import org.apache.spark.sql.sources.GreaterThan; import org.apache.spark.sql.sources.LessThan; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.TableProvider; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.IntegerType$; import org.junit.AfterClass; import org.junit.Assert; @@ -67,6 +66,8 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + +import javax.annotation.processing.SupportedOptions; import java.io.File; import java.io.IOException; import java.sql.Timestamp; @@ -205,12 +206,9 @@ public void testUnpartitionedIDFilters() { IcebergSource source = new IcebergSource(); for (int i = 0; i < 10; i += 1) { - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, EqualTo.apply("id", i)); - - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should only create one task for a small file", 1, tasks.size()); + Batch reader = getBatchReader(source, options, EqualTo.apply("id", i)); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should only create one task for a small file", 1, tasks.length); // validate row filtering assertEqualsSafe(SCHEMA.asStruct(), expected(i), @@ -226,12 +224,10 @@ public void testUnpartitionedTimestampFilter() { IcebergSource source = new IcebergSource(); - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); + Batch reader = getBatchReader(source, options, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should only create one task for a small file", 1, tasks.size()); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should only create one task for a small file", 1, tasks.length); assertEqualsSafe(SCHEMA.asStruct(), expected(5,6,7,8,9), read(unpartitioned.toString(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)")); @@ -246,19 +242,16 @@ public void testBucketPartitionedIDFilters() { ); IcebergSource source = new IcebergSource(); - DataSourceReader unfiltered = source.createReader(options); + Batch unfiltered = getBatchReader(source, options); Assert.assertEquals("Unfiltered table should created 4 read tasks", - 4, unfiltered.planInputPartitions().size()); + 4, unfiltered.planInputPartitions().length); for (int i = 0; i < 10; i += 1) { - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, EqualTo.apply("id", i)); - - List> tasks = reader.planInputPartitions(); + Batch reader = getBatchReader(source, options, EqualTo.apply("id", i)); + InputPartition[] tasks = reader.planInputPartitions(); // validate predicate push-down - Assert.assertEquals("Should create one task for a single bucket", 1, tasks.size()); + Assert.assertEquals("Should create one task for a single bucket", 1, tasks.length); // validate row filtering assertEqualsSafe(SCHEMA.asStruct(), expected(i), read(location.toString(), "id = " + i)); @@ -274,31 +267,28 @@ public void testDayPartitionedTimestampFilters() { ); IcebergSource source = new IcebergSource(); - DataSourceReader unfiltered = source.createReader(options); + Batch unfiltered = getBatchReader(source, options); Assert.assertEquals("Unfiltered table should created 2 read tasks", - 2, unfiltered.planInputPartitions().size()); + 2, unfiltered.planInputPartitions().length); { - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); + Batch reader = getBatchReader(source, options, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size()); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.length); assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9), read(location.toString(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)")); } { - DataSourceReader reader = source.createReader(options); + Batch reader = getBatchReader(source, options, + And.apply( + GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"), + LessThan.apply("ts", "2017-12-22T08:00:00+00:00"))); - pushFilters(reader, And.apply( - GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"), - LessThan.apply("ts", "2017-12-22T08:00:00+00:00"))); - - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.size()); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.length); assertEqualsSafe(SCHEMA.asStruct(), expected(1, 2), read(location.toString(), "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and " + @@ -315,31 +305,28 @@ public void testHourPartitionedTimestampFilters() { ); IcebergSource source = new IcebergSource(); - DataSourceReader unfiltered = source.createReader(options); + Batch unfiltered = getBatchReader(source, options); Assert.assertEquals("Unfiltered table should created 9 read tasks", - 9, unfiltered.planInputPartitions().size()); + 9, unfiltered.planInputPartitions().length); { - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); + Batch reader = getBatchReader(source, options, LessThan.apply("ts", "2017-12-22T00:00:00+00:00")); - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.size()); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.length); assertEqualsSafe(SCHEMA.asStruct(), expected(8, 9, 7, 6, 5), read(location.toString(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)")); } { - DataSourceReader reader = source.createReader(options); - - pushFilters(reader, And.apply( - GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"), - LessThan.apply("ts", "2017-12-22T08:00:00+00:00"))); + Batch reader = getBatchReader(source, options, + And.apply( + GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"), + LessThan.apply("ts", "2017-12-22T08:00:00+00:00"))); - List> tasks = reader.planInputPartitions(); - Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.size()); + InputPartition[] tasks = reader.planInputPartitions(); + Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.length); assertEqualsSafe(SCHEMA.asStruct(), expected(2, 1), read(location.toString(), "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and " + @@ -379,6 +366,14 @@ public void testFilterByNonProjectedColumn() { } } + private static Batch getBatchReader(TableProvider source, DataSourceOptions options, Filter... filters) { + SupportsBatchRead table = (SupportsBatchRead) source.getTable(options); + ScanBuilder scanBuilder = table.newScanBuilder(options); + pushFilters(scanBuilder, filters); + + return scanBuilder.build().toBatch(); + } + private static Record projectFlat(Schema projection, Record record) { org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(projection, "test"); Record result = new Record(avroSchema); @@ -418,7 +413,7 @@ private List expected(int... ordinals) { return expected; } - private void pushFilters(DataSourceReader reader, Filter... filters) { + private static void pushFilters(ScanBuilder reader, Filter... filters) { Assert.assertTrue(reader instanceof SupportsPushDownFilters); SupportsPushDownFilters filterable = (SupportsPushDownFilters) reader; filterable.pushFilters(filters); From 4a821e4ec1d7ed2c6eb0f39148ef08bc34eac944 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 11:56:20 -0700 Subject: [PATCH 10/15] delete vestigial encryption code --- .../com/netflix/iceberg/BaseMetastoreTableOperations.java | 7 ------- .../iceberg/encryption/BaseEncryptionKeyMetadata.java | 2 +- .../com/netflix/iceberg/hadoop/HadoopTableOperations.java | 7 ------- .../java/com/netflix/iceberg/spark/source/TestTables.java | 7 ------- 4 files changed, 1 insertion(+), 22 deletions(-) diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java index bf5925f50129..4cda7f8f53fc 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java @@ -20,8 +20,6 @@ package com.netflix.iceberg; import com.google.common.base.Objects; -import com.netflix.iceberg.encryption.EncryptionManager; -import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.hadoop.HadoopFileIO; import com.netflix.iceberg.io.FileIO; @@ -139,11 +137,6 @@ public FileIO io() { return fileIo; } - @Override - public EncryptionManager encryption() { - return new PlaintextEncryptionManager(); - } - @Override public LocationProvider locationProvider() { return LocationProviders.locationsFor(current().location(), current().properties()); diff --git a/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java b/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java index d0e5d6ff004e..00e581c39c55 100644 --- a/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java +++ b/core/src/main/java/com/netflix/iceberg/encryption/BaseEncryptionKeyMetadata.java @@ -53,6 +53,6 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { return new BaseEncryptionKeyMetadata( - keyMetadata == null ? null : ByteBuffers.copy(keyMetadata)); + ByteBuffers.copy(keyMetadata)); } } diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 27a2797e5b79..1f784a7cabca 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -21,8 +21,6 @@ import com.google.common.base.Preconditions; import com.netflix.iceberg.LocationProviders; -import com.netflix.iceberg.encryption.EncryptionManager; -import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.TableMetadata; import com.netflix.iceberg.TableMetadataParser; @@ -162,11 +160,6 @@ public FileIO io() { return defaultFileIo; } - @Override - public EncryptionManager encryption() { - return new PlaintextEncryptionManager(); - } - @Override public LocationProvider locationProvider() { return LocationProviders.locationsFor(current().location(), current().properties()); diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java index 3f303d90b093..5a121a7cc1b4 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java @@ -22,8 +22,6 @@ import com.google.common.collect.Maps; import com.netflix.iceberg.BaseTable; import com.netflix.iceberg.LocationProviders; -import com.netflix.iceberg.encryption.EncryptionManager; -import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.Files; import com.netflix.iceberg.PartitionSpec; @@ -165,11 +163,6 @@ public FileIO io() { return new LocalFileIO(); } - @Override - public EncryptionManager encryption() { - return new PlaintextEncryptionManager(); - } - @Override public LocationProvider locationProvider() { Preconditions.checkNotNull(current, From f4018dad6b00c4d4b149a64b2768577783cca885 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 16:17:20 -0700 Subject: [PATCH 11/15] change seed --- .../com/netflix/iceberg/spark/data/TestParquetAvroReader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/com/netflix/iceberg/spark/data/TestParquetAvroReader.java index 34cb03b055bd..cdffea84f03b 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/TestParquetAvroReader.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/TestParquetAvroReader.java @@ -184,7 +184,8 @@ public void testWithOldReadPath() throws IOException { @Test public void testCorrectness() throws IOException { - Iterable records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139); + // TODO (yifeih): Change the seed back to 34139 after merging https://github.com/apache/parquet-mr/pull/620 + Iterable records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34138); File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); From f727e6e50f7bff21d8a9632415436d095e309b36 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 16:25:30 -0700 Subject: [PATCH 12/15] try again --- .../java/com/netflix/iceberg/LocalTableOperations.java | 7 ------- core/src/test/java/com/netflix/iceberg/TestTables.java | 7 ------- 2 files changed, 14 deletions(-) diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java index 6eb08e9c7fcd..e60bbaaa7269 100644 --- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java +++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java @@ -20,8 +20,6 @@ package com.netflix.iceberg; import com.google.common.collect.Maps; -import com.netflix.iceberg.encryption.EncryptionManager; -import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.io.FileIO; import java.util.Map; @@ -61,11 +59,6 @@ public FileIO io() { return io; } - @Override - public EncryptionManager encryption() { - return new PlaintextEncryptionManager(); - } - @Override public String metadataFileLocation(String fileName) { return createdMetadataFilePaths.computeIfAbsent(fileName, name -> { diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java index 29a88df5aed6..a0048e384bed 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTables.java +++ b/core/src/test/java/com/netflix/iceberg/TestTables.java @@ -22,8 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.netflix.iceberg.encryption.EncryptionManager; -import com.netflix.iceberg.encryption.PlaintextEncryptionManager; import com.netflix.iceberg.exceptions.AlreadyExistsException; import com.netflix.iceberg.exceptions.CommitFailedException; import com.netflix.iceberg.exceptions.RuntimeIOException; @@ -183,11 +181,6 @@ public FileIO io() { return new LocalFileIO(); } - @Override - public EncryptionManager encryption() { - return new PlaintextEncryptionManager(); - } - @Override public LocationProvider locationProvider() { Preconditions.checkNotNull(current, From eb08e1cca8ed6f7e07da373e2e808bb28493ed30 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 16:39:58 -0700 Subject: [PATCH 13/15] some cleanups --- .../java/com/netflix/iceberg/parquet/ParquetWriter.java | 2 +- .../main/java/com/netflix/iceberg/spark/source/Writer.java | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java index de66ba969af3..3a4636fdfbc3 100644 --- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java @@ -160,7 +160,7 @@ private void startRowGroup() { this.recordCount = 0; PageWriteStore pageStore = pageStoreCtor.newInstance( - compressor, parquetSchema, props.getAllocator(), 0); + compressor, parquetSchema, props.getAllocator(), props.getColumnIndexTruncateLength()); this.flushPageStoreToWriter = flushToWriter.bind(pageStore); this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore); diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index be98c28c07bf..8e38d15ad524 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -72,7 +72,6 @@ import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; -// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage class Writer implements BatchWrite { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); @@ -94,12 +93,6 @@ public DataWriterFactory createBatchWriterFactory() { table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); } -// @Override -// public DataWriterFactory createWriterFactory() { -// return new WriterFactory( -// table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager); -// } - @Override public void commit(WriterCommitMessage[] messages) { AppendFiles append = table.newAppend(); From d04c3e4f4271ece4b03f206a28fc47faded07c47 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 17:31:06 -0700 Subject: [PATCH 14/15] address comments and eliminate some diffs --- .../iceberg/spark/source/IcebergSource.java | 8 +------ .../spark/source/IcebergSparkTable.java | 24 +++++++++---------- .../netflix/iceberg/spark/source/Reader.java | 21 +++++++++------- .../spark/source/TestFilteredScan.java | 7 +++--- 4 files changed, 29 insertions(+), 31 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index e2e85c9e5fd0..29626d3cf62c 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -20,11 +20,8 @@ package com.netflix.iceberg.spark.source; import com.google.common.base.Preconditions; -import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; import com.netflix.iceberg.hadoop.HadoopTables; -import com.netflix.iceberg.spark.SparkSchemaUtil; -import com.netflix.iceberg.types.CheckCompatibility; import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; @@ -32,13 +29,10 @@ import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.types.StructType; -import java.util.List; import java.util.Map; import java.util.Optional; -public class IcebergSource implements - TableProvider, - DataSourceRegister { +public class IcebergSource implements TableProvider, DataSourceRegister { private SparkSession lazySpark = null; private Configuration lazyConf = null; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java index ea2126c9584a..fb07f99971d5 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSparkTable.java @@ -59,11 +59,15 @@ public ScanBuilder newScanBuilder(DataSourceOptions options) { @Override public WriteBuilder newWriteBuilder(DataSourceOptions options) { - IcebergWriterBuilder writerBuilder = new IcebergWriterBuilder(table); - Optional formatOption = options.get("iceberg.write.format"); - formatOption.ifPresent(writerBuilder::setFileFormat); - return writerBuilder; + if (formatOption.isPresent()) { + return new IcebergWriterBuilder(table, FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH))); + } + return new IcebergWriterBuilder( + table, + FileFormat.valueOf(table.properties() + .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) + .toUpperCase(Locale.ENGLISH))); } @Override @@ -80,17 +84,11 @@ public StructType schema() { private static class IcebergWriterBuilder implements WriteBuilder, SupportsSaveMode { private final Table table; - private FileFormat fileFormat; + private final FileFormat fileFormat; - public IcebergWriterBuilder(Table table) { + public IcebergWriterBuilder(Table table, FileFormat fileFormat) { this.table = table; - this.fileFormat = FileFormat.valueOf(table.properties() - .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT) - .toUpperCase(Locale.ENGLISH)); - } - - public void setFileFormat(String format) { - fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + this.fileFormat = fileFormat; } @Override diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java index a4b17f72d94b..1bd77703e299 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java @@ -54,7 +54,13 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; -import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.sources.v2.reader.Batch; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.PartitionReader; +import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.Statistics; +import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -245,13 +251,12 @@ private static class TaskDataReader implements PartitionReader { .impl(UnsafeProjection.class, InternalRow.class) .build(); + private final Iterator tasks; private final Schema tableSchema; private final Schema expectedSchema; private final FileIO fileIo; private final Map inputFiles; - private final Iterator tasks; - private Iterator currentIterator = null; private Closeable currentCloseable = null; private InternalRow current = null; @@ -376,9 +381,9 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema } private Iterator open(FileScanTask task, Schema readSchema) { - CloseableIterable iter; InputFile location = inputFiles.get(task.file().path().toString()); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); + CloseableIterable iter; switch (task.file().format()) { case PARQUET: iter = newParquetIterable(location, task, readSchema); @@ -398,10 +403,10 @@ private Iterator open(FileScanTask task, Schema readSchema) { return iter.iterator(); } - private CloseableIterable newAvroIterable(InputFile inputFile, - FileScanTask task, - Schema readSchema) { - return Avro.read(inputFile) + private CloseableIterable newAvroIterable(InputFile location, + FileScanTask task, + Schema readSchema) { + return Avro.read(location) .reuseContainers() .project(readSchema) .split(task.start(), task.length()) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java index b0c330cb746e..42819a540e8a 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java @@ -55,7 +55,10 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.SupportsBatchRead; import org.apache.spark.sql.sources.v2.TableProvider; -import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.sources.v2.reader.Batch; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; import org.apache.spark.sql.types.IntegerType$; import org.junit.AfterClass; import org.junit.Assert; @@ -66,8 +69,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; - -import javax.annotation.processing.SupportedOptions; import java.io.File; import java.io.IOException; import java.sql.Timestamp; From 4b327ea38a83def8231d5286c4833fea3d9932f1 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Mar 2019 17:34:23 -0700 Subject: [PATCH 15/15] delete some more stuff --- .../java/com/netflix/iceberg/spark/source/IcebergSource.java | 1 - .../com/netflix/iceberg/spark/data/TestSparkDateTimes.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 29626d3cf62c..b84bda96ac74 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -28,7 +28,6 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.types.StructType; - import java.util.Map; import java.util.Optional; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java index 73a0becb9b6a..cc9ec427bcce 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkDateTimes.java @@ -22,13 +22,9 @@ import com.netflix.iceberg.expressions.Literal; import com.netflix.iceberg.types.Types; import org.apache.spark.sql.catalyst.util.DateTimeUtils; -import org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter; -import org.apache.spark.sql.catalyst.util.TimestampFormatter; import org.apache.spark.sql.catalyst.util.TimestampFormatter$; import org.junit.Assert; import org.junit.Test; - -import java.text.SimpleDateFormat; import java.util.TimeZone; public class TestSparkDateTimes {