diff --git a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index 653c27313ec3..c6984e2fe8cd 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) { "Cannot project decimal with incompatible precision: %s < %s", requestedDecimal.precision(), decimal.precision()); break; - case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - Preconditions.checkArgument(timestamp.shouldAdjustToUTC(), - "Cannot project timestamp (without time zone) as timestamptz (with time zone)"); - break; default: } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTimestampType.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTimestampType.java new file mode 100644 index 000000000000..d90b4630773a --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTimestampType.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.FixupTypes; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * By default Spark type {@link org.apache.iceberg.types.Types.TimestampType} should be converted to + * {@link Types.TimestampType#withZone()} iceberg type. But we also can convert + * {@link org.apache.iceberg.types.Types.TimestampType} to {@link Types.TimestampType#withoutZone()} iceberg type + * by setting {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} to 'true' + */ +class SparkFixupTimestampType extends FixupTypes { + + private SparkFixupTimestampType(Schema referenceSchema) { + super(referenceSchema); + } + + static Schema fixup(Schema schema) { + return new Schema(TypeUtil.visit(schema, + new SparkFixupTimestampType(schema)).asStructType().fields()); + } + + @Override + public Type primitive(Type.PrimitiveType primitive) { + if (primitive.typeId() == Type.TypeID.TIMESTAMP) { + return Types.TimestampType.withoutZone(); + } + return primitive; + } + + @Override + protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { + return Type.TypeID.TIMESTAMP.equals(type.typeId()); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java index 2d3ea4c81f00..5508965af249 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java @@ -52,6 +52,11 @@ protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { return true; } break; + case TIMESTAMP: + if (source.typeId() == Type.TypeID.TIMESTAMP) { + return true; + } + break; default: } return false; diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index 484c407e0247..b503ba634d85 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -122,8 +122,31 @@ public static DataType convert(Type type) { * @throws IllegalArgumentException if the type cannot be converted */ public static Schema convert(StructType sparkType) { + return convert(sparkType, false); + } + + /** + * Convert a Spark {@link StructType struct} to a {@link Schema} with new field ids. + *

+ * This conversion assigns fresh ids. + *

+ * Some data types are represented as the same Spark type. These are converted to a default type. + *

+ * To convert using a reference schema for field ids and ambiguous types, use + * {@link #convert(Schema, StructType)}. + * + * @param sparkType a Spark StructType + * @param useTimestampWithoutZone boolean flag indicates that timestamp should be stored without timezone + * @return the equivalent Schema + * @throws IllegalArgumentException if the type cannot be converted + */ + public static Schema convert(StructType sparkType, boolean useTimestampWithoutZone) { Type converted = SparkTypeVisitor.visit(sparkType, new SparkTypeToType(sparkType)); - return new Schema(converted.asNestedType().asStructType().fields()); + Schema schema = new Schema(converted.asNestedType().asStructType().fields()); + if (useTimestampWithoutZone) { + schema = SparkFixupTimestampType.fixup(schema); + } + return schema; } /** diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 53ca29363f8f..4d5c3ec9e48e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -20,21 +20,38 @@ package org.apache.iceberg.spark; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.RuntimeConfig; import org.apache.spark.util.SerializableConfiguration; public class SparkUtil { + + public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = + "spark.sql.iceberg.handle-timestamp-without-timezone"; + public static final String TIMESTAMP_WITHOUT_TIMEZONE_ERROR = String.format("Cannot handle timestamp without" + + " timezone fields in Spark. Spark does not natively support this type but if you would like to handle all" + + " timestamps as timestamp with timezone set '%s' to true. This will not change the underlying values stored" + + " but will change their displayed values in Spark. For more information please see" + + " https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and" + + "-spark-sql-timestamps", HANDLE_TIMESTAMP_WITHOUT_TIMEZONE); + public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES = + "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables"; + private SparkUtil() { } @@ -99,4 +116,58 @@ public static Pair catalogAndIdentifier(List nameParts, } } } + + /** + * Responsible for checking if the table schema has a timestamp without timezone column + * @param schema table schema to check if it contains a timestamp without timezone column + * @return boolean indicating if the schema passed in has a timestamp field without a timezone + */ + public static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> Types.TimestampType.withoutZone().equals(t)) != null; + } + + /** + * Allow reading/writing timestamp without time zone as timestamp with time zone. Generally, + * this is not safe as timestamp without time zone is supposed to represent wall clock time semantics, + * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM, + * but timestamp with time zone represents instant semantics, i.e the timestamp + * is adjusted so that the corresponding time in the reader timezone is displayed. + * When set to false (default), we throw an exception at runtime + * "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + * + * @param readerConfig table read options + * @param sessionConf spark session configurations + * @return boolean indicating if reading timestamps without timezone is allowed + */ + public static boolean canHandleTimestampWithoutZone(Map readerConfig, RuntimeConfig sessionConf) { + String readerOption = readerConfig.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE); + if (readerOption != null) { + return Boolean.parseBoolean(readerOption); + } + String sessionConfValue = sessionConf.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null); + if (sessionConfValue != null) { + return Boolean.parseBoolean(sessionConfValue); + } + return false; + } + + /** + * Check whether the spark session config contains a {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} + * property. + * Default value - false + * If true in new table all timestamp fields will be stored as {@link Types.TimestampType#withoutZone()}, + * otherwise {@link Types.TimestampType#withZone()} will be used + * + * @param sessionConf a spark runtime config + * @return true if the session config has {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} property + * and this property is set to true + */ + public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionConf) { + String sessionConfValue = sessionConf.get(USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null); + if (sessionConfValue != null) { + return Boolean.parseBoolean(sessionConfValue); + } + return false; + } + } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index b9e9dfade792..6a8be60eb078 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - if (timestamp.shouldAdjustToUTC()) { - return TimestampType$.MODULE$; - } - throw new UnsupportedOperationException( - "Spark does not support timestamp without time zone fields"); + return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; case UUID: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 8a45aabf5fc2..4ed6420a9aa4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -105,6 +105,7 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case DOUBLE: return OrcValueReaders.doubles(); case TIMESTAMP_INSTANT: + case TIMESTAMP: return SparkOrcValueReaders.timestampTzs(); case DECIMAL: return SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 73c391ad07c3..2c1edea1ffef 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -118,6 +118,7 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript case DECIMAL: return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale()); case TIMESTAMP_INSTANT: + case TIMESTAMP: return SparkOrcValueWriters.timestampTz(); default: throw new IllegalArgumentException("Unhandled type " + primitive); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 33e1dac1235e..418c25993a7e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -128,6 +128,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit primitiveValueReader = OrcValueReaders.doubles(); break; case TIMESTAMP_INSTANT: + case TIMESTAMP: primitiveValueReader = SparkOrcValueReaders.timestampTzs(); break; case DECIMAL: diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index ae9c7fdc9af4..8cbf6c795956 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -20,15 +20,20 @@ package org.apache.iceberg.spark.data; import java.io.IOException; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; +import org.apache.spark.sql.internal.SQLConf; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -185,4 +190,51 @@ public void testMixedTypes() throws IOException { writeAndValidate(schema); } + + @Test + public void testTimestampWithoutZone() throws IOException { + withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { + Schema schema = TypeUtil.assignIncreasingFreshIds(new Schema( + required(0, "id", LongType.get()), + optional(1, "ts_without_zone", Types.TimestampType.withoutZone()))); + + writeAndValidate(schema); + }); + } + + protected void withSQLConf(Map conf, Action action) throws IOException { + SQLConf sqlConf = SQLConf.get(); + + Map currentConfValues = Maps.newHashMap(); + conf.keySet().forEach(confKey -> { + if (sqlConf.contains(confKey)) { + String currentConfValue = sqlConf.getConfString(confKey); + currentConfValues.put(confKey, currentConfValue); + } + }); + + conf.forEach((confKey, confValue) -> { + if (SQLConf.staticConfKeys().contains(confKey)) { + throw new RuntimeException("Cannot modify the value of a static config: " + confKey); + } + sqlConf.setConfString(confKey, confValue); + }); + + try { + action.invoke(); + } finally { + conf.forEach((confKey, confValue) -> { + if (currentConfValues.containsKey(confKey)) { + sqlConf.setConfString(confKey, currentConfValues.get(confKey)); + } else { + sqlConf.unsetConf(confKey); + } + }); + } + } + + @FunctionalInterface + protected interface Action { + void invoke() throws IOException; + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 0c4598a209e8..24724d3d12bf 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -24,7 +24,9 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Collection; @@ -122,13 +124,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString()); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp); Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime OffsetDateTime actualTs = EPOCH.plusNanos( (ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000)); - Assert.assertEquals("Timestamp should be equal", expected, actualTs); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime()); + } break; case STRING: Assert.assertTrue("Should be a String", actual instanceof String); @@ -241,9 +249,16 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual Assert.assertEquals("Primitive value should be equal to expected", expectedDays, actual); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); - long expectedMicros = ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) expected); - Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + long expectedMicros = ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) expected); + Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + long expectedMicros = ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime) expected).atZone(ZoneId.of("UTC"))); + Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual); + } break; case STRING: Assert.assertTrue("Should be a UTF8String", actual instanceof UTF8String); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java new file mode 100644 index 000000000000..482a711910ab --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.Files.localOutput; + +@RunWith(Parameterized.class) +public abstract class TestTimestampWithoutZone extends SparkTestBase { + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestTimestampWithoutZone.spark; + TestTimestampWithoutZone.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final String format; + private final boolean vectorized; + + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + { "parquet", false }, + { "parquet", true }, + { "avro", false } + }; + } + + public TestTimestampWithoutZone(String format, boolean vectorized) { + this.format = format; + this.vectorized = vectorized; + } + + private File parent = null; + private File unpartitioned = null; + private List records = null; + + @Before + public void writeUnpartitionedTable() throws IOException { + this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.unpartitioned = new File(parent, "unpartitioned"); + File dataFolder = new File(unpartitioned, "data"); + Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + + File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); + + // create records using the table's schema + this.records = testRecords(tableSchema); + + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); + } + + DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(records.size()) + .withFileSizeInBytes(testFile.length()) + .withPath(testFile.toString()) + .build(); + + table.newAppend().appendFile(file).commit(); + } + + @Test + public void testUnpartitionedTimestampWithoutZone() { + assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneProjection() { + Schema projection = SCHEMA.select("id", "ts"); + assertEqualsSafe(projection.asStruct(), + records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized, "id", "ts")); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnpartitionedTimestampWithoutZoneError() { + AssertHelpers.assertThrows(String.format("Read operation performed on a timestamp without timezone field while " + + "'%s' set to false should throw exception", + SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE), + IllegalArgumentException.class, + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, + () -> spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false") + .load(unpartitioned.toString()) + .collectAsList()); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneAppend() { + spark.read().format("iceberg") + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true") + .option("vectorization-enabled", String.valueOf(vectorized)) + .load(unpartitioned.toString()) + .write() + .format("iceberg") + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true") + .mode(SaveMode.Append) + .save(unpartitioned.toString()); + + assertEqualsSafe(SCHEMA.asStruct(), + Stream.concat(records.stream(), records.stream()).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneWriteError() { + String errorMessage = String.format("Write operation performed on a timestamp without timezone field while " + + "'%s' set to false should throw exception", + SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE); + Runnable writeOperation = () -> spark.read().format("iceberg") + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true") + .option("vectorization-enabled", String.valueOf(vectorized)) + .load(unpartitioned.toString()) + .write() + .format("iceberg") + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false") + .mode(SaveMode.Append) + .save(unpartitioned.toString()); + + AssertHelpers.assertThrows(errorMessage, IllegalArgumentException.class, + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, writeOperation); + + } + + @Test + public void testUnpartitionedTimestampWithoutZoneSessionProperties() { + withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { + spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .load(unpartitioned.toString()) + .write() + .format("iceberg") + .mode(SaveMode.Append) + .save(unpartitioned.toString()); + + assertEqualsSafe(SCHEMA.asStruct(), + Stream.concat(records.stream(), records.stream()).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized)); + }); + } + + private static Record projectFlat(Schema projection, Record record) { + Record result = GenericRecord.create(projection); + List fields = projection.asStruct().fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + result.set(i, record.getField(field.name())); + } + return result; + } + + public static void assertEqualsSafe(Types.StructType struct, + List expected, List actual) { + Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); + } + } + + private List testRecords(Schema schema) { + return Lists.newArrayList( + record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"), + record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"), + record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"), + record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"), + record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"), + record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"), + record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"), + record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"), + record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"), + record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish") + ); + } + + private static List read(String table, boolean vectorized) { + return read(table, vectorized, "*"); + } + + private static List read(String table, boolean vectorized, String select0, String... selectN) { + Dataset dataset = spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true") + .load(table) + .select(select0, selectN); + return dataset.collectAsList(); + } + + private static LocalDateTime parseToLocal(String timestamp) { + return LocalDateTime.parse(timestamp); + } + + private static Record record(Schema schema, Object... values) { + Record rec = GenericRecord.create(schema); + for (int i = 0; i < values.length; i += 1) { + rec.set(i, values[i]); + } + return rec; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 9b86e004c06d..022b81d4e9b1 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -83,6 +83,10 @@ public Optional createWriter(String jobId, StructType dsStruct "Save mode %s is not supported", mode); Configuration conf = new Configuration(lazyBaseConf()); Table table = getTableAndResolveHadoopConfiguration(options, conf); + boolean handleTimestampWithoutZone = + SparkUtil.canHandleTimestampWithoutZone(options.asMap(), lazySparkSession().conf()); + Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct); TypeUtil.validateWriteSchema(table.schema(), writeSchema, checkNullability(options), checkOrdering(options)); SparkUtil.validatePartitionTransforms(table.spec()); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 7a9b73e82d43..edc03c3ef7e5 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -51,6 +51,7 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.iceberg.util.Tasks; @@ -98,6 +99,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private Filter[] pushedFilters = NO_FILTERS; private final boolean localityPreferred; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private Schema schema = null; @@ -160,6 +162,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + RuntimeConfig sessionConf = SparkSession.active().conf(); + this.readTimestampWithoutZone = SparkUtil.canHandleTimestampWithoutZone(options.asMap(), sessionConf); } private Schema lazySchema() { @@ -183,6 +187,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(lazySchema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java new file mode 100644 index 000000000000..84c7d1aeb733 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone24(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index e8c7db75a581..d099291fc353 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -87,6 +87,7 @@ public class SparkCatalog extends BaseCatalog { private SupportsNamespaces asNamespaceCatalog = null; private String[] defaultNamespace = null; private HadoopTables tables; + private boolean useTimestampsWithoutZone; /** * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter. @@ -128,7 +129,7 @@ public SparkTable loadTable(Identifier ident) throws NoSuchTableException { public SparkTable createTable(Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Table icebergTable = builder @@ -145,7 +146,7 @@ public SparkTable createTable(Identifier ident, StructType schema, @Override public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) @@ -161,7 +162,7 @@ public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] @Override public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] transforms, Map properties) throws NoSuchTableException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) @@ -177,7 +178,7 @@ public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] @Override public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] transforms, Map properties) { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone); Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) .withLocation(properties.get("location")) @@ -387,7 +388,9 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { Catalog catalog = buildIcebergCatalog(name, options); this.catalogName = name; - this.tables = new HadoopTables(SparkSession.active().sessionState().newHadoopConf()); + SparkSession sparkSession = SparkSession.active(); + this.useTimestampsWithoutZone = SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf()); + this.tables = new HadoopTables(sparkSession.sessionState().newHadoopConf()); this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index baf40341cece..acd02037aced 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -36,8 +36,10 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.iceberg.util.Tasks; @@ -71,6 +73,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Schema expectedSchema; private final List filterExpressions; private final int batchSize; + private final boolean readTimestampWithoutZone; private final CaseInsensitiveStringMap options; // lazy variables @@ -86,6 +89,9 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); this.batchSize = Spark3Util.batchSize(table.properties(), options); this.options = options; + + RuntimeConfig sessionConf = SparkSession.active().conf(); + this.readTimestampWithoutZone = SparkUtil.canHandleTimestampWithoutZone(options, sessionConf); } protected Table table() { @@ -120,6 +126,8 @@ public MicroBatchStream toMicroBatchStream(String checkpointLocation) { @Override public StructType readSchema() { if (readSchema == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(expectedSchema), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); this.readSchema = SparkSchemaUtil.convert(expectedSchema); } return readSchema; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java index 19debd53c9cb..b23e0a7935cf 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java @@ -51,6 +51,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo private final StructType dsSchema; private final CaseInsensitiveStringMap options; private final String overwriteMode; + private final boolean canHandleTimestampWithoutZone; private boolean overwriteDynamic = false; private boolean overwriteByFilter = false; private Expression overwriteExpr = null; @@ -66,6 +67,7 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo this.options = info.options(); this.overwriteMode = options.containsKey("overwrite-mode") ? options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null; + this.canHandleTimestampWithoutZone = SparkUtil.canHandleTimestampWithoutZone(options, spark.conf()); } public WriteBuilder overwriteFiles(Scan scan, IsolationLevel writeIsolationLevel) { @@ -103,6 +105,9 @@ public WriteBuilder overwrite(Filter[] filters) { @Override public BatchWrite buildForBatch() { // Validate + Preconditions.checkArgument(canHandleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); + Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema); TypeUtil.validateWriteSchema(table.schema(), writeSchema, checkNullability(spark, options), checkOrdering(spark, options)); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java new file mode 100644 index 000000000000..4216aec02789 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone3 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone3(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestTimestampWithoutZone.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestTimestampWithoutZone.java new file mode 100644 index 000000000000..b6e92c82abd7 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestTimestampWithoutZone.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestTimestampWithoutZone extends SparkCatalogTestBase { + + private static final String newTableName = "created_table"; + private final Map config; + + private static final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "tsz", Types.TimestampType.withZone()) + ); + + private final List values = ImmutableList.of( + row(1L, toTimestamp("2021-01-01T00:00:00.0"), toTimestamp("2021-02-01T00:00:00.0")), + row(2L, toTimestamp("2021-01-01T00:00:00.0"), toTimestamp("2021-02-01T00:00:00.0")), + row(3L, toTimestamp("2021-01-01T00:00:00.0"), toTimestamp("2021-02-01T00:00:00.0")) + ); + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][]{{"spark_catalog", + SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "cache-enabled", "false" + )} + }; + } + + public TestTimestampWithoutZone(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + this.config = config; + } + + @Before + public void createTables() { + validationCatalog.createTable(tableIdent, schema); + } + + @After + public void removeTables() { + validationCatalog.dropTable(tableIdent, true); + sql("DROP TABLE IF EXISTS %s", newTableName); + } + + @Test + public void testWriteTimestampWithoutZoneError() { + AssertHelpers.assertThrows( + String.format("Write operation performed on a timestamp without timezone field while " + + "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE), + IllegalArgumentException.class, + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, + () -> sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values))); + } + + @Test + public void testAppendTimestampWithoutZone() { + withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { + sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values)); + + Assert.assertEquals("Should have " + values.size() + " row", + (long) values.size(), scalarSql("SELECT count(*) FROM %s", tableName)); + + assertEquals("Row data should match expected", + values, sql("SELECT * FROM %s ORDER BY id", tableName)); + }); + } + + @Test + public void testCreateAsSelectWithTimestampWithoutZone() { + withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { + sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values)); + + sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", newTableName, tableName); + + Assert.assertEquals("Should have " + values.size() + " row", (long) values.size(), + scalarSql("SELECT count(*) FROM %s", newTableName)); + + assertEquals("Row data should match expected", + sql("SELECT * FROM %s ORDER BY id", tableName), + sql("SELECT * FROM %s ORDER BY id", newTableName)); + }); + } + + @Test + public void testCreateNewTableShouldHaveTimestampWithZoneIcebergType() { + withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { + sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values)); + + sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", newTableName, tableName); + + Assert.assertEquals("Should have " + values.size() + " row", (long) values.size(), + scalarSql("SELECT count(*) FROM %s", newTableName)); + + assertEquals("Data from created table should match data from base table", + sql("SELECT * FROM %s ORDER BY id", tableName), + sql("SELECT * FROM %s ORDER BY id", newTableName)); + + Table createdTable = validationCatalog.loadTable(TableIdentifier.of("default", newTableName)); + assertFieldsType(createdTable.schema(), Types.TimestampType.withZone(), "ts", "tsz"); + }); + } + + @Test + public void testCreateNewTableShouldHaveTimestampWithoutZoneIcebergType() { + withSQLConf(ImmutableMap.of( + SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true", + SparkUtil.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, "true"), () -> { + spark.sessionState().catalogManager().currentCatalog() + .initialize(catalog.name(), new CaseInsensitiveStringMap(config)); + sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values)); + + sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", newTableName, tableName); + + Assert.assertEquals("Should have " + values.size() + " row", (long) values.size(), + scalarSql("SELECT count(*) FROM %s", newTableName)); + + assertEquals("Row data should match expected", + sql("SELECT * FROM %s ORDER BY id", tableName), + sql("SELECT * FROM %s ORDER BY id", newTableName)); + Table createdTable = validationCatalog.loadTable(TableIdentifier.of("default", newTableName)); + assertFieldsType(createdTable.schema(), Types.TimestampType.withoutZone(), "ts", "tsz"); + }); + } + + private Timestamp toTimestamp(String value) { + return new Timestamp(DateTime.parse(value).getMillis()); + } + + private String rowToSqlValues(List rows) { + List rowValues = rows.stream().map(row -> { + List columns = Arrays.stream(row).map(value -> { + if (value instanceof Long) { + return value.toString(); + } else if (value instanceof Timestamp) { + return String.format("timestamp '%s'", value); + } + throw new RuntimeException("Type is not supported"); + }).collect(Collectors.toList()); + return "(" + Joiner.on(",").join(columns) + ")"; + }).collect(Collectors.toList()); + return Joiner.on(",").join(rowValues); + } + + private void assertFieldsType(Schema actual, Type.PrimitiveType expected, String... fields) { + actual.select(fields).asStruct().fields().forEach(field -> Assert.assertEquals(expected, field.type())); + } +}