diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java new file mode 100644 index 0000000000..e5d2652bcb --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Factory to create a new {@link FileAppender} to write {@link Record}s. + */ +public class GenericAppenderFactory implements FileAppenderFactory { + + private final Schema schema; + private final Map config = Maps.newHashMap(); + + public GenericAppenderFactory(Schema schema) { + this.schema = schema; + } + + public GenericAppenderFactory set(String property, String value) { + config.put(property, value); + return this; + } + + public GenericAppenderFactory setAll(Map properties) { + config.putAll(properties); + return this; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (fileFormat) { + case AVRO: + return Avro.write(outputFile) + .schema(schema) + .createWriterFunc(DataWriter::create) + .setAll(config) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(outputFile) + .schema(schema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .setAll(config) + .metricsConfig(metricsConfig) + .overwrite() + .build(); + + case ORC: + return ORC.write(outputFile) + .schema(schema) + .createWriterFunc(GenericOrcWriter::buildWriter) + .setAll(config) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/TestSplitScan.java b/data/src/test/java/org/apache/iceberg/TestSplitScan.java index b0d870a3e6..a1c6410252 100644 --- a/data/src/test/java/org/apache/iceberg/TestSplitScan.java +++ b/data/src/test/java/org/apache/iceberg/TestSplitScan.java @@ -24,15 +24,12 @@ import java.util.List; import java.util.Locale; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -122,28 +119,10 @@ private File writeToFile(List records, FileFormat fileFormat) throws IOE File file = temp.newFile(); Assert.assertTrue(file.delete()); - switch (fileFormat) { - case AVRO: - try (FileAppender appender = Avro.write(Files.localOutput(file)) - .schema(SCHEMA) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build()) { - appender.addAll(records); - } - break; - case PARQUET: - try (FileAppender appender = Parquet.write(Files.localOutput(file)) - .schema(SCHEMA) - .createWriterFunc(GenericParquetWriter::buildWriter) - .named(fileFormat.name()) - .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)) - .build()) { - appender.addAll(records); - } - break; - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set( + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); + try (FileAppender appender = factory.newAppender(Files.localOutput(file), fileFormat)) { + appender.addAll(records); } return file; } diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java new file mode 100644 index 0000000000..c32be08263 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +/** + * Helper for appending {@link DataFile} to a table or appending {@link Record}s to a table. + */ +public class GenericAppenderHelper { + + private final Table table; + private final FileFormat fileFormat; + private final TemporaryFolder tmp; + + public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { + this.table = table; + this.fileFormat = fileFormat; + this.tmp = tmp; + } + + public void appendToTable(DataFile... dataFiles) { + Preconditions.checkNotNull(table, "table not set"); + + AppendFiles append = table.newAppend(); + + for (DataFile dataFile : dataFiles) { + append = append.appendFile(dataFile); + } + + append.commit(); + } + + public void appendToTable(List records) throws IOException { + appendToTable(null, records); + } + + public void appendToTable(StructLike partition, List records) throws IOException { + appendToTable(writeFile(partition, records)); + } + + public DataFile writeFile(StructLike partition, List records) throws IOException { + Preconditions.checkNotNull(table, "table not set"); + File file = tmp.newFile(); + Assert.assertTrue(file.delete()); + return appendToLocalFile(table, file, fileFormat, partition, records); + } + + private static DataFile appendToLocalFile( + Table table, File file, FileFormat format, StructLike partition, List records) + throws IOException { + FileAppender appender = new GenericAppenderFactory(table.schema()).newAppender( + Files.localOutput(file), format); + try (FileAppender fileAppender = appender) { + fileAppender.addAll(records); + } + + return DataFiles.builder(table.spec()) + .withRecordCount(records.size()) + .withFileSizeInBytes(file.length()) + .withPath(file.toURI().toString()) + .withMetrics(appender.metrics()) + .withFormat(format) + .withPartition(partition) + .build(); + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 5acd2a32ea..b3a5323c6a 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -39,16 +39,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Tables; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -458,59 +452,17 @@ private DataFile writeFile(String location, String filename, Schema schema, List Path path = new Path(location, filename); FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); - switch (fileFormat) { - case AVRO: - FileAppender avroAppender = Avro.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build(); - try { - avroAppender.addAll(records); - } finally { - avroAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(avroAppender.metrics()) - .build(); - - case PARQUET: - FileAppender parquetAppender = Parquet.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(GenericParquetWriter::buildWriter) - .build(); - try { - parquetAppender.addAll(records); - } finally { - parquetAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(parquetAppender.metrics()) - .build(); - - case ORC: - FileAppender orcAppender = ORC.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(GenericOrcWriter::buildWriter) - .build(); - try { - orcAppender.addAll(records); - } finally { - orcAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(orcAppender.metrics()) - .build(); - - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + + FileAppender fileAppender = new GenericAppenderFactory(schema).newAppender( + fromPath(path, CONF), fileFormat); + try (FileAppender appender = fileAppender) { + appender.addAll(records); } + + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(HadoopInputFile.fromPath(path, CONF)) + .withMetrics(fileAppender.metrics()) + .build(); } @Test diff --git a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index 653c27313e..c6984e2fe8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) { "Cannot project decimal with incompatible precision: %s < %s", requestedDecimal.precision(), decimal.precision()); break; - case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - Preconditions.checkArgument(timestamp.shouldAdjustToUTC(), - "Cannot project timestamp (without time zone) as timestamptz (with time zone)"); - break; default: } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index b9e9dfade7..6a8be60eb0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - if (timestamp.shouldAdjustToUTC()) { - return TimestampType$.MODULE$; - } - throw new UnsupportedOperationException( - "Spark does not support timestamp without time zone fields"); + return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; case UUID: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 8a45aabf5f..32a04886eb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -104,6 +104,7 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio return OrcValueReaders.floats(); case DOUBLE: return OrcValueReaders.doubles(); + case TIMESTAMP: case TIMESTAMP_INSTANT: return SparkOrcValueReaders.timestampTzs(); case DECIMAL: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 160e4dbd77..81fc631ae7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -127,6 +127,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit case DOUBLE: primitiveValueReader = OrcValueReaders.doubles(); break; + case TIMESTAMP: case TIMESTAMP_INSTANT: primitiveValueReader = SparkOrcValueReaders.timestampTzs(); break; diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 0c4598a209..3112c9304b 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -122,13 +123,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString()); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp); Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime OffsetDateTime actualTs = EPOCH.plusNanos( (ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000)); - Assert.assertEquals("Timestamp should be equal", expected, actualTs); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime()); + } break; case STRING: Assert.assertTrue("Should be a String", actual instanceof String); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index ac64fa952c..d88daef1c2 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -31,14 +31,9 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkValueConverter; @@ -111,33 +106,9 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); - switch (format) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), format)) { + writer.add(record); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java new file mode 100644 index 0000000000..30daa9b2c3 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.Files.localOutput; + +@RunWith(Parameterized.class) +public abstract class TestTimestampWithoutZone { + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestTimestampWithoutZone.spark; + TestTimestampWithoutZone.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final String format; + private final boolean vectorized; + + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + { "parquet", false }, + { "parquet", true }, + { "avro", false }, + { "orc", false }, + { "orc", true } + }; + } + + public TestTimestampWithoutZone(String format, boolean vectorized) { + this.format = format; + this.vectorized = vectorized; + } + + private File parent = null; + private File unpartitioned = null; + private List records = null; + + @Before + public void writeUnpartitionedTable() throws IOException { + this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.unpartitioned = new File(parent, "unpartitioned"); + File dataFolder = new File(unpartitioned, "data"); + Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + + File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); + + // create records using the table's schema + this.records = testRecords(tableSchema); + + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); + } + + DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(records.size()) + .withFileSizeInBytes(testFile.length()) + .withPath(testFile.toString()) + .build(); + + table.newAppend().appendFile(file).commit(); + } + + @Test + public void testUnpartitionedTimestampWithoutZone() { + assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneProjection() { + Schema projection = SCHEMA.select("id", "ts"); + assertEqualsSafe(projection.asStruct(), + records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized, "id", "ts")); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnpartitionedTimestampWithoutZoneError() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Spark does not support timestamp without time zone fields"); + + spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "false") + .load(unpartitioned.toString()) + .collectAsList(); + } + + private static Record projectFlat(Schema projection, Record record) { + Record result = GenericRecord.create(projection); + List fields = projection.asStruct().fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + result.set(i, record.getField(field.name())); + } + return result; + } + + public static void assertEqualsSafe(Types.StructType struct, + List expected, List actual) { + Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); + } + } + + private List testRecords(Schema schema) { + return Lists.newArrayList( + record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"), + record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"), + record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"), + record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"), + record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"), + record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"), + record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"), + record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"), + record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"), + record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish") + ); + } + + private static List read(String table, boolean vectorized) { + return read(table, vectorized, "*"); + } + + private static List read(String table, boolean vectorized, String select0, String... selectN) { + Dataset dataset = spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "true") + .load(table) + .select(select0, selectN); + return dataset.collectAsList(); + } + + private static LocalDateTime parseToLocal(String timestamp) { + return LocalDateTime.parse(timestamp); + } + + private static Record record(Schema schema, Object... values) { + Record rec = GenericRecord.create(schema); + for (int i = 0; i < values.length; i += 1) { + rec.set(i, values[i]); + } + return rec; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 6d5b0ab8d4..73a7fed334 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -52,6 +52,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -98,6 +101,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private Schema schema = null; @@ -164,6 +168,15 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.batchSize = options.get("batch-size").map(Integer::parseInt).orElse( PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. However, at LinkedIn, all readers + // and writers are in the UTC timezone as our production machines are set to UTC. So, timestamp with/without time + // zone is the same. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.get("read-timestamp-without-zone").map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() { @@ -187,6 +200,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), + "Spark does not support timestamp without time zone fields"); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; @@ -344,12 +359,20 @@ public boolean enableBatchRead() { boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema()); + this.readUsingBatch = batchReadsEnabled && ((allOrcFileScanTasks && hasNoRowFilters) || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); } return readUsingBatch; } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 0d45179b31..7fc7a98fd0 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -33,17 +33,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.data.GenericsHelpers; @@ -191,33 +186,9 @@ public void writeUnpartitionedTable() throws IOException { // create records using the table's schema this.records = testRecords(tableSchema); - switch (fileFormat) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java new file mode 100644 index 0000000000..84c7d1aeb7 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone24(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 97c5fead2e..357ade3ea3 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -45,9 +45,13 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.OrcRowFilterUtils; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.catalyst.InternalRow; @@ -83,8 +87,10 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables + private StructType readSchema = null; private List tasks = null; // lazy cache of tasks SparkBatchScan(Table table, Broadcast io, Broadcast encryption, boolean caseSensitive, @@ -122,6 +128,15 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options); this.batchReadsEnabled = Spark3Util.isVectorizationEnabled(table.properties(), options); this.batchSize = Spark3Util.batchSize(table.properties(), options); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. However, at LinkedIn, all readers + // and writers are in the UTC timezone as our production machines are set to UTC. So, timestamp with/without time + // zone is the same. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.getBoolean("read-timestamp-without-zone", false); } @Override @@ -131,7 +146,12 @@ public Batch toBatch() { @Override public StructType readSchema() { - return SparkSchemaUtil.convert(expectedSchema); + if (readSchema == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), + "Spark does not support timestamp without time zone fields"); + this.readSchema = SparkSchemaUtil.convert(expectedSchema); + } + return readSchema; } @Override @@ -177,12 +197,20 @@ public PartitionReaderFactory createReaderFactory() { boolean onlyPrimitives = expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType()); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(expectedSchema); + boolean readUsingBatch = batchReadsEnabled && ((allOrcFileScanTasks && hasNoRowFilters) || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); return new ReaderFactory(readUsingBatch ? batchSize : 0); } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + @Override public Statistics estimateStatistics() { if (table instanceof LegacyHiveTable) { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 9be9938387..1166f32c08 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -33,16 +33,11 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.data.GenericsHelpers; @@ -188,33 +183,9 @@ public void writeUnpartitionedTable() throws IOException { this.records = testRecords(tableSchema); - switch (fileFormat) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java new file mode 100644 index 0000000000..4216aec027 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone3 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone3(String format, boolean vectorized) { + super(format, vectorized); + } +}