From 5359aaf9b73b3eacf52f0242542648a8b3ce23d1 Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sat, 27 Feb 2021 19:29:31 -0800 Subject: [PATCH 01/10] spark: Add in support to read timestamp without timezone from parquet --- .../spark/PruneColumnsWithoutReordering.java | 5 - .../apache/iceberg/spark/TypeToSparkType.java | 7 +- .../iceberg/spark/data/GenericsHelpers.java | 11 +- .../source/TestTimestampWithoutZone.java | 227 ++++++++++++++++++ .../apache/iceberg/spark/source/Reader.java | 23 +- .../source/TestTimestampWithoutZone24.java | 26 ++ .../iceberg/spark/source/SparkBatchScan.java | 24 +- .../source/TestTimestampWithoutZone3.java | 26 ++ 8 files changed, 334 insertions(+), 15 deletions(-) create mode 100644 spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java create mode 100644 spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index 653c27313ec3..c6984e2fe8cd 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) { "Cannot project decimal with incompatible precision: %s < %s", requestedDecimal.precision(), decimal.precision()); break; - case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - Preconditions.checkArgument(timestamp.shouldAdjustToUTC(), - "Cannot project timestamp (without time zone) as timestamptz (with time zone)"); - break; default: } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index b9e9dfade792..6a8be60eb078 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - if (timestamp.shouldAdjustToUTC()) { - return TimestampType$.MODULE$; - } - throw new UnsupportedOperationException( - "Spark does not support timestamp without time zone fields"); + return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; case UUID: diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 0c4598a209e8..3112c9304bbc 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -122,13 +123,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString()); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp); Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime OffsetDateTime actualTs = EPOCH.plusNanos( (ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000)); - Assert.assertEquals("Timestamp should be equal", expected, actualTs); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime()); + } break; case STRING: Assert.assertTrue("Should be a String", actual instanceof String); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java new file mode 100644 index 000000000000..30daa9b2c30f --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.Files.localOutput; + +@RunWith(Parameterized.class) +public abstract class TestTimestampWithoutZone { + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestTimestampWithoutZone.spark; + TestTimestampWithoutZone.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final String format; + private final boolean vectorized; + + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + { "parquet", false }, + { "parquet", true }, + { "avro", false }, + { "orc", false }, + { "orc", true } + }; + } + + public TestTimestampWithoutZone(String format, boolean vectorized) { + this.format = format; + this.vectorized = vectorized; + } + + private File parent = null; + private File unpartitioned = null; + private List records = null; + + @Before + public void writeUnpartitionedTable() throws IOException { + this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.unpartitioned = new File(parent, "unpartitioned"); + File dataFolder = new File(unpartitioned, "data"); + Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + + File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); + + // create records using the table's schema + this.records = testRecords(tableSchema); + + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); + } + + DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(records.size()) + .withFileSizeInBytes(testFile.length()) + .withPath(testFile.toString()) + .build(); + + table.newAppend().appendFile(file).commit(); + } + + @Test + public void testUnpartitionedTimestampWithoutZone() { + assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneProjection() { + Schema projection = SCHEMA.select("id", "ts"); + assertEqualsSafe(projection.asStruct(), + records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized, "id", "ts")); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnpartitionedTimestampWithoutZoneError() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Spark does not support timestamp without time zone fields"); + + spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "false") + .load(unpartitioned.toString()) + .collectAsList(); + } + + private static Record projectFlat(Schema projection, Record record) { + Record result = GenericRecord.create(projection); + List fields = projection.asStruct().fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + result.set(i, record.getField(field.name())); + } + return result; + } + + public static void assertEqualsSafe(Types.StructType struct, + List expected, List actual) { + Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); + } + } + + private List testRecords(Schema schema) { + return Lists.newArrayList( + record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"), + record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"), + record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"), + record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"), + record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"), + record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"), + record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"), + record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"), + record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"), + record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish") + ); + } + + private static List read(String table, boolean vectorized) { + return read(table, vectorized, "*"); + } + + private static List read(String table, boolean vectorized, String select0, String... selectN) { + Dataset dataset = spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "true") + .load(table) + .select(select0, selectN); + return dataset.collectAsList(); + } + + private static LocalDateTime parseToLocal(String timestamp) { + return LocalDateTime.parse(timestamp); + } + + private static Record record(Schema schema, Object... values) { + Record rec = GenericRecord.create(schema); + for (int i = 0; i < values.length; i += 1) { + rec.set(i, values[i]); + } + return rec; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 4b0723e7ee30..b346dc17521c 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -51,6 +51,9 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -97,6 +100,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private Schema schema = null; @@ -170,6 +174,13 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.get("read-timestamp-without-zone").map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() { @@ -193,6 +204,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), + "Spark does not support timestamp without time zone fields"); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; @@ -338,12 +351,20 @@ public boolean enableBatchRead() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema()); + this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); } return readUsingBatch; } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java new file mode 100644 index 000000000000..84c7d1aeb733 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone24(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index efe94459410c..526c3af467b6 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -37,8 +37,12 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -68,6 +72,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private StructType readSchema = null; @@ -84,6 +89,13 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options); this.batchReadsEnabled = Spark3Util.isVectorizationEnabled(table.properties(), options); this.batchSize = Spark3Util.batchSize(table.properties(), options); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.getBoolean("read-timestamp-without-zone", false); } protected Table table() { @@ -112,6 +124,8 @@ public Batch toBatch() { @Override public StructType readSchema() { if (readSchema == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), + "Spark does not support timestamp without time zone fields"); this.readSchema = SparkSchemaUtil.convert(expectedSchema); } return readSchema; @@ -156,12 +170,20 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(expectedSchema); + boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); return new ReaderFactory(readUsingBatch ? batchSize : 0); } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + @Override public Statistics estimateStatistics() { // its a fresh table, no data diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java new file mode 100644 index 000000000000..4216aec02789 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone3 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone3(String format, boolean vectorized) { + super(format, vectorized); + } +} From 4444d587c85b287ec2251d63da2173fe46f5ac48 Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sat, 27 Feb 2021 20:53:53 -0800 Subject: [PATCH 02/10] spark: Remove ORC vectorized test for reading timestamp without timezone --- .../apache/iceberg/spark/source/TestTimestampWithoutZone.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index 30daa9b2c30f..5b663638529c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -93,9 +93,7 @@ public static Object[][] parameters() { return new Object[][] { { "parquet", false }, { "parquet", true }, - { "avro", false }, - { "orc", false }, - { "orc", true } + { "avro", false } }; } From 022dde6d4e56c373194a78a52e3f61df8d582026 Mon Sep 17 00:00:00 2001 From: bkahloon Date: Wed, 3 Mar 2021 21:06:24 -0800 Subject: [PATCH 03/10] Spark: address PR comments --- .../org/apache/iceberg/spark/SparkUtil.java | 25 ++++++++++++++++++ .../apache/iceberg/spark/TypeToSparkType.java | 7 +++++ .../source/TestTimestampWithoutZone.java | 26 ++++++++----------- .../apache/iceberg/spark/source/Reader.java | 15 ++++------- .../iceberg/spark/source/SparkBatchScan.java | 15 ++++------- 5 files changed, 53 insertions(+), 35 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 3537d94ed3cf..24faedc03c66 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -25,16 +25,30 @@ import java.util.stream.Collectors; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.spark.util.SerializableConfiguration; public class SparkUtil { + + public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG = "spark-handle-timestamp-without-timezone"; + public static final String TIMESTAMP_WITHOUT_TIMEZONE_ERROR = String.format("Cannot handle timestamp without" + + " timezone fields in Spark. Spark does not natively support this type but if you would like to handle all" + + " timestamps as timestamp with timezone set '%s' to true. This will not change the underlying values stored" + + " but will change their displayed values in Spark. For more information please see" + + " https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and" + + "-spark-sql-timestamps", + HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG); + private SparkUtil() { } @@ -100,4 +114,15 @@ public static Pair catalogAndIdentifier(List nameParts, } } } + + /** + * Responsible for checking if the table schema has a timestamp without timezone column + * @param schema table schema to check if it contains a timestamp without timezone column + * @return boolean indicating if the schema passed in has a timestamp field without a timezone + */ + public static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index 6a8be60eb078..0ac9eee2525b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,6 +104,13 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: +// Types.TimestampType timestamp = (Types.TimestampType) primitive; +// boolean readTimestampWithoutZone = options.get(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAGz) +// .map(Boolean::parseBoolean).orElse(false); +// if (timestamp.shouldAdjustToUTC() || readTimestampWithoutZone) { +// return TimestampType$.MODULE$; +// } +// throw new UnsupportedOperationException(SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index 5b663638529c..b2e95fd184f9 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -27,18 +27,14 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; +import org.apache.iceberg.*; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.data.GenericsHelpers; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -155,14 +151,14 @@ public void testUnpartitionedTimestampWithoutZoneProjection() { @Test public void testUnpartitionedTimestampWithoutZoneError() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Spark does not support timestamp without time zone fields"); - - spark.read().format("iceberg") - .option("vectorization-enabled", String.valueOf(vectorized)) - .option("read-timestamp-without-zone", "false") - .load(unpartitioned.toString()) - .collectAsList(); + AssertHelpers.assertThrows(String.format("Read operation performed on a timestamp without timezone field while " + + "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG), + IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, + () -> spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "false") + .load(unpartitioned.toString()) + .collectAsList()); } private static Record projectFlat(Schema projection, Record record) { @@ -205,7 +201,7 @@ private static List read(String table, boolean vectorized) { private static List read(String table, boolean vectorized, String select0, String... selectN) { Dataset dataset = spark.read().format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) - .option("read-timestamp-without-zone", "true") + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "true") .load(table) .select(select0, selectN); return dataset.collectAsList(); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index b346dc17521c..bd780d3ec517 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -51,6 +51,7 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -180,7 +181,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus // is adjusted so that the corresponding time in the reader timezone is displayed. // When set to false (default), we throw an exception at runtime // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields - this.readTimestampWithoutZone = options.get("read-timestamp-without-zone").map(Boolean::parseBoolean).orElse(false); + this.readTimestampWithoutZone = options.get(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG).map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() { @@ -204,8 +205,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { - Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), - "Spark does not support timestamp without time zone fields"); + Preconditions.checkArgument(readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(lazySchema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; @@ -351,7 +352,7 @@ public boolean enableBatchRead() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema()); + boolean hasTimestampWithoutZone = SparkUtil.hasTimestampWithoutZone(lazySchema()); this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); @@ -359,12 +360,6 @@ public boolean enableBatchRead() { return readUsingBatch; } - private static boolean hasTimestampWithoutZone(Schema schema) { - return TypeUtil.find(schema, t -> - t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() - ) != null; - } - private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 526c3af467b6..3e4af8051c78 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -40,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -95,7 +96,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { // is adjusted so that the corresponding time in the reader timezone is displayed. // When set to false (default), we throw an exception at runtime // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields - this.readTimestampWithoutZone = options.getBoolean("read-timestamp-without-zone", false); + this.readTimestampWithoutZone = options.getBoolean(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, false); } protected Table table() { @@ -124,8 +125,8 @@ public Batch toBatch() { @Override public StructType readSchema() { if (readSchema == null) { - Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), - "Spark does not support timestamp without time zone fields"); + Preconditions.checkArgument(readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(expectedSchema), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); this.readSchema = SparkSchemaUtil.convert(expectedSchema); } return readSchema; @@ -170,7 +171,7 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); - boolean hasTimestampWithoutZone = hasTimestampWithoutZone(expectedSchema); + boolean hasTimestampWithoutZone = SparkUtil.hasTimestampWithoutZone(expectedSchema); boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); @@ -178,12 +179,6 @@ public PartitionReaderFactory createReaderFactory() { return new ReaderFactory(readUsingBatch ? batchSize : 0); } - private static boolean hasTimestampWithoutZone(Schema schema) { - return TypeUtil.find(schema, t -> - t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() - ) != null; - } - @Override public Statistics estimateStatistics() { // its a fresh table, no data From c914154a9293ee69cc1dd48d371c9c1379664f3f Mon Sep 17 00:00:00 2001 From: bkahloon Date: Fri, 5 Mar 2021 21:53:22 -0800 Subject: [PATCH 04/10] Spark: fix build failure due to import of all iceberg packages --- .../iceberg/spark/source/TestTimestampWithoutZone.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index b2e95fd184f9..42d6afd4e89c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -27,12 +27,18 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.data.GenericsHelpers; From a875e7a1b2bc2d0c1ee470e53b7d901bf03fadeb Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sat, 6 Mar 2021 22:41:52 -0800 Subject: [PATCH 05/10] Spark: remove unsed imports and try to fix package import ordering --- .../iceberg/spark/source/TestTimestampWithoutZone.java | 2 +- .../main/java/org/apache/iceberg/spark/source/Reader.java | 6 ------ .../org/apache/iceberg/spark/source/SparkBatchScan.java | 3 --- 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index 42d6afd4e89c..efad51242423 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -38,7 +39,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.data.GenericsHelpers; diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 21ce7862d77e..757555390a0b 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -45,16 +45,10 @@ import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 3c42c2781357..b89cb0f920ee 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -42,9 +42,6 @@ import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; From 3be2739b90dd87df130c298193ee00aba93a776c Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sun, 7 Mar 2021 20:11:33 -0800 Subject: [PATCH 06/10] Spark: fix missing imports --- .../src/main/java/org/apache/iceberg/spark/source/Reader.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 757555390a0b..ef6b9383bc81 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -45,6 +45,9 @@ import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; From 82194f674aa322bc4d89f1d3ebc4b18abca03cf3 Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sun, 7 Mar 2021 20:38:27 -0800 Subject: [PATCH 07/10] Spark: fix code formatting issue --- .../apache/iceberg/spark/source/TestTimestampWithoutZone.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index efad51242423..bc3cac3a3c7d 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -159,8 +159,7 @@ public void testUnpartitionedTimestampWithoutZoneProjection() { public void testUnpartitionedTimestampWithoutZoneError() { AssertHelpers.assertThrows(String.format("Read operation performed on a timestamp without timezone field while " + "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG), - IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, - () -> spark.read().format("iceberg") + IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, () -> spark.read().format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "false") .load(unpartitioned.toString()) From e09b137fd1cf999c2e80e1aa223c6457646037cc Mon Sep 17 00:00:00 2001 From: bkahloon Date: Sun, 7 Mar 2021 20:43:55 -0800 Subject: [PATCH 08/10] Spark: fix code formatting issue --- .../iceberg/spark/source/TestTimestampWithoutZone.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index bc3cac3a3c7d..78b14d21a2c6 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -160,10 +160,10 @@ public void testUnpartitionedTimestampWithoutZoneError() { AssertHelpers.assertThrows(String.format("Read operation performed on a timestamp without timezone field while " + "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG), IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, () -> spark.read().format("iceberg") - .option("vectorization-enabled", String.valueOf(vectorized)) - .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "false") - .load(unpartitioned.toString()) - .collectAsList()); + .option("vectorization-enabled", String.valueOf(vectorized)) + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "false") + .load(unpartitioned.toString()) + .collectAsList()); } private static Record projectFlat(Schema projection, Record record) { From 687a1a42447c4c0ff5bd2b717b2fe5dc731e89cb Mon Sep 17 00:00:00 2001 From: bkahloon Date: Mon, 8 Mar 2021 21:54:12 -0800 Subject: [PATCH 09/10] Spark: Fix formatting error of long line --- .../apache/iceberg/spark/source/TestTimestampWithoutZone.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index 78b14d21a2c6..cbdc4dc4d209 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -159,7 +159,8 @@ public void testUnpartitionedTimestampWithoutZoneProjection() { public void testUnpartitionedTimestampWithoutZoneError() { AssertHelpers.assertThrows(String.format("Read operation performed on a timestamp without timezone field while " + "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG), - IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, () -> spark.read().format("iceberg") + IllegalArgumentException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, () -> spark.read() + .format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG, "false") .load(unpartitioned.toString()) From 79744a996ff4d139014e9c9db5a9561498ff37e7 Mon Sep 17 00:00:00 2001 From: bkahloon Date: Mon, 8 Mar 2021 22:02:58 -0800 Subject: [PATCH 10/10] Spark: Fix formatting error of long line --- .../src/main/java/org/apache/iceberg/spark/source/Reader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index ef6b9383bc81..0f62b14d9476 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -170,7 +170,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus // is adjusted so that the corresponding time in the reader timezone is displayed. // When set to false (default), we throw an exception at runtime // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields - this.readTimestampWithoutZone = options.get(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG).map(Boolean::parseBoolean).orElse(false); + this.readTimestampWithoutZone = options.get(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_FLAG) + .map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() {