Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3268799
spark: Add in support to read timestamp without timezone from parquet
bkahloon Feb 28, 2021
4b9a190
spark: Remove ORC vectorized test for reading timestamp without timezone
bkahloon Feb 28, 2021
f5036fb
Spark: address PR comments
bkahloon Mar 4, 2021
bac19c6
Spark: fix build failure due to import of all iceberg packages
bkahloon Mar 6, 2021
ab0bf3a
Spark: remove unsed imports and try to fix package import ordering
bkahloon Mar 7, 2021
f8a5293
Spark: fix code formatting issue
bkahloon Mar 8, 2021
ee386b9
Spark: fix code formatting issue
bkahloon Mar 8, 2021
fc6ee0e
Spark: Fix formatting error of long line
bkahloon Mar 9, 2021
e537c0b
Spark: Fix formatting error of long line
bkahloon Mar 9, 2021
abb607e
Add support for writing timestamps without timezone.
sshkvar Jun 11, 2021
4bee290
Added missed check for handling tomestamp without zone for Writer in …
sshkvar Jun 17, 2021
0259262
Added missed check for handling tomestamp without zone for Writer in …
sshkvar Jun 17, 2021
1579abc
Address PR comments.
sshkvar Jun 29, 2021
4f37486
Address PR comments.
sshkvar Jun 30, 2021
bafaffb
Address PR comments.
sshkvar Jul 1, 2021
7acaec0
Address PR comments.
sshkvar Jul 1, 2021
cee72a0
Address few little clean up
sshkvar Jul 12, 2021
459ce89
Address few little clean up
sshkvar Jul 12, 2021
d14b2b2
Address few little clean up
sshkvar Jul 12, 2021
398a2a0
fix for `'lambda arguments' has incorrect indentation level 12, expec…
sshkvar Jul 12, 2021
3f6b0f2
fix for `incorrect indentation level 6, expected level should be 8.`
sshkvar Jul 12, 2021
bc316c4
Added withSQLConf method to AvroDataTest.java as suggested in the PR …
sshkvar Jul 13, 2021
c5917c6
Fixed overly indented as requested in PR
sshkvar Jul 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) {
"Cannot project decimal with incompatible precision: %s < %s",
requestedDecimal.precision(), decimal.precision());
break;
case TIMESTAMP:
Types.TimestampType timestamp = (Types.TimestampType) primitive;
Preconditions.checkArgument(timestamp.shouldAdjustToUTC(),
"Cannot project timestamp (without time zone) as timestamptz (with time zone)");
break;
default:
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import org.apache.iceberg.Schema;
import org.apache.iceberg.types.FixupTypes;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* By default Spark type {@link org.apache.iceberg.types.Types.TimestampType} should be converted to
* {@link Types.TimestampType#withZone()} iceberg type. But we also can convert
* {@link org.apache.iceberg.types.Types.TimestampType} to {@link Types.TimestampType#withoutZone()} iceberg type
* by setting {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} to 'true'
*/
class SparkFixupTimestampType extends FixupTypes {

private SparkFixupTimestampType(Schema referenceSchema) {
super(referenceSchema);
}

static Schema fixup(Schema schema) {
return new Schema(TypeUtil.visit(schema,
new SparkFixupTimestampType(schema)).asStructType().fields());
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
if (primitive.typeId() == Type.TypeID.TIMESTAMP) {
return Types.TimestampType.withoutZone();
}
return primitive;
}

@Override
protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
return Type.TypeID.TIMESTAMP.equals(type.typeId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
return true;
}
break;
case TIMESTAMP:
if (source.typeId() == Type.TypeID.TIMESTAMP) {
return true;
}
break;
default:
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,31 @@ public static DataType convert(Type type) {
* @throws IllegalArgumentException if the type cannot be converted
*/
public static Schema convert(StructType sparkType) {
return convert(sparkType, false);
}

/**
* Convert a Spark {@link StructType struct} to a {@link Schema} with new field ids.
* <p>
* This conversion assigns fresh ids.
* <p>
* Some data types are represented as the same Spark type. These are converted to a default type.
* <p>
* To convert using a reference schema for field ids and ambiguous types, use
* {@link #convert(Schema, StructType)}.
*
* @param sparkType a Spark StructType
* @param useTimestampWithoutZone boolean flag indicates that timestamp should be stored without timezone
* @return the equivalent Schema
* @throws IllegalArgumentException if the type cannot be converted
*/
public static Schema convert(StructType sparkType, boolean useTimestampWithoutZone) {
Type converted = SparkTypeVisitor.visit(sparkType, new SparkTypeToType(sparkType));
return new Schema(converted.asNestedType().asStructType().fields());
Schema schema = new Schema(converted.asNestedType().asStructType().fields());
if (useTimestampWithoutZone) {
schema = SparkFixupTimestampType.fixup(schema);
}
return schema;
}

/**
Expand Down
71 changes: 71 additions & 0 deletions spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,38 @@
package org.apache.iceberg.spark;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.util.SerializableConfiguration;

public class SparkUtil {

public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
"spark.sql.iceberg.handle-timestamp-without-timezone";
public static final String TIMESTAMP_WITHOUT_TIMEZONE_ERROR = String.format("Cannot handle timestamp without" +
" timezone fields in Spark. Spark does not natively support this type but if you would like to handle all" +
" timestamps as timestamp with timezone set '%s' to true. This will not change the underlying values stored" +
" but will change their displayed values in Spark. For more information please see" +
" https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and" +
"-spark-sql-timestamps", HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES =
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables";

private SparkUtil() {
}

Expand Down Expand Up @@ -99,4 +116,58 @@ public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
}
}
}

/**
* Responsible for checking if the table schema has a timestamp without timezone column
* @param schema table schema to check if it contains a timestamp without timezone column
* @return boolean indicating if the schema passed in has a timestamp field without a timezone
*/
public static boolean hasTimestampWithoutZone(Schema schema) {
return TypeUtil.find(schema, t -> Types.TimestampType.withoutZone().equals(t)) != null;
}

/**
* Allow reading/writing timestamp without time zone as timestamp with time zone. Generally,
* this is not safe as timestamp without time zone is supposed to represent wall clock time semantics,
* i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
* but timestamp with time zone represents instant semantics, i.e the timestamp
* is adjusted so that the corresponding time in the reader timezone is displayed.
* When set to false (default), we throw an exception at runtime
* "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields
*
* @param readerConfig table read options
* @param sessionConf spark session configurations
* @return boolean indicating if reading timestamps without timezone is allowed
*/
public static boolean canHandleTimestampWithoutZone(Map<String, String> readerConfig, RuntimeConfig sessionConf) {
String readerOption = readerConfig.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
if (readerOption != null) {
return Boolean.parseBoolean(readerOption);
}
String sessionConfValue = sessionConf.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
if (sessionConfValue != null) {
return Boolean.parseBoolean(sessionConfValue);
}
return false;
}

/**
* Check whether the spark session config contains a {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES}
* property.
* Default value - false
* If true in new table all timestamp fields will be stored as {@link Types.TimestampType#withoutZone()},
* otherwise {@link Types.TimestampType#withZone()} will be used
*
* @param sessionConf a spark runtime config
* @return true if the session config has {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} property
* and this property is set to true
*/
public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionConf) {
String sessionConfValue = sessionConf.get(USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);
if (sessionConfValue != null) {
return Boolean.parseBoolean(sessionConfValue);
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) {
throw new UnsupportedOperationException(
"Spark does not support time fields");
case TIMESTAMP:
Types.TimestampType timestamp = (Types.TimestampType) primitive;
if (timestamp.shouldAdjustToUTC()) {
return TimestampType$.MODULE$;
}
throw new UnsupportedOperationException(
"Spark does not support timestamp without time zone fields");
return TimestampType$.MODULE$;
case STRING:
return StringType$.MODULE$;
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case DOUBLE:
return OrcValueReaders.doubles();
case TIMESTAMP_INSTANT:
case TIMESTAMP:
return SparkOrcValueReaders.timestampTzs();
case DECIMAL:
return SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript
case DECIMAL:
return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
case TIMESTAMP_INSTANT:
case TIMESTAMP:
return SparkOrcValueWriters.timestampTz();
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
primitiveValueReader = OrcValueReaders.doubles();
break;
case TIMESTAMP_INSTANT:
case TIMESTAMP:
primitiveValueReader = SparkOrcValueReaders.timestampTzs();
break;
case DECIMAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
package org.apache.iceberg.spark.data;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.StructType;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -185,4 +190,51 @@ public void testMixedTypes() throws IOException {

writeAndValidate(schema);
}

@Test
public void testTimestampWithoutZone() throws IOException {
withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> {
Schema schema = TypeUtil.assignIncreasingFreshIds(new Schema(
required(0, "id", LongType.get()),
optional(1, "ts_without_zone", Types.TimestampType.withoutZone())));

writeAndValidate(schema);
});
}

protected void withSQLConf(Map<String, String> conf, Action action) throws IOException {
SQLConf sqlConf = SQLConf.get();

Map<String, String> currentConfValues = Maps.newHashMap();
conf.keySet().forEach(confKey -> {
if (sqlConf.contains(confKey)) {
String currentConfValue = sqlConf.getConfString(confKey);
currentConfValues.put(confKey, currentConfValue);
}
});

conf.forEach((confKey, confValue) -> {
if (SQLConf.staticConfKeys().contains(confKey)) {
throw new RuntimeException("Cannot modify the value of a static config: " + confKey);
}
sqlConf.setConfString(confKey, confValue);
});

try {
action.invoke();
} finally {
conf.forEach((confKey, confValue) -> {
if (currentConfValues.containsKey(confKey)) {
sqlConf.setConfString(confKey, currentConfValues.get(confKey));
} else {
sqlConf.unsetConf(confKey);
}
});
}
}

@FunctionalInterface
protected interface Action {
void invoke() throws IOException;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up PR (in a separate PR either before or after this one is merged), particularly if you're looking for some more work to do to contribute to the project, you might explore if this combination of withSQLConf and the corresponding @FunctionalInterface can be abstracted into their own interface that one could mix into tests.

I'm not 100% sure how that would look, maybe an interface like ConfigurableTestSQLConf or something?

Again, just copying it for now is fine, but it would be nice to reduce the code duplication and make this easier for others to use in the future. Your exploration might find that it’s better to not do that (I’m more of a Scala developer myself and so to me it feels like a mixin). Something to think about for later though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fully agree with you, it can be moved to separate interface with static method and placed in some general package like

@FunctionalInterface
public interface ConfigurableTestSQLConf {

  void invoke() throws IOException;

  static void withSQLConf(Map<String, String> conf, ConfigurableTestSQLConf action) throws IOException {
    SQLConf sqlConf = SQLConf.get();

    Map<String, String> currentConfValues = Maps.newHashMap();
    conf.keySet().forEach(confKey -> {
      if (sqlConf.contains(confKey)) {
        String currentConfValue = sqlConf.getConfString(confKey);
        currentConfValues.put(confKey, currentConfValue);
      }
    });

    conf.forEach((confKey, confValue) -> {
      if (SQLConf.staticConfKeys().contains(confKey)) {
        throw new RuntimeException("Cannot modify the value of a static config: " + confKey);
      }
      sqlConf.setConfString(confKey, confValue);
    });

    try {
      action.invoke();
    } finally {
      conf.forEach((confKey, confValue) -> {
        if (currentConfValues.containsKey(confKey)) {
          sqlConf.setConfString(confKey, currentConfValues.get(confKey));
        } else {
          sqlConf.unsetConf(confKey);
        }
      });
    }
  }
}

But this part better to do in separate PR, because other packages will be affected

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
Expand Down Expand Up @@ -122,13 +124,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual)
Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString());
break;
case TIMESTAMP:
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp);
Timestamp ts = (Timestamp) actual;
// milliseconds from nanos has already been added by getTime
OffsetDateTime actualTs = EPOCH.plusNanos(
(ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000));
Assert.assertEquals("Timestamp should be equal", expected, actualTs);
Types.TimestampType timestampType = (Types.TimestampType) type;
if (timestampType.shouldAdjustToUTC()) {
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
Assert.assertEquals("Timestamp should be equal", expected, actualTs);
} else {
Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime);
Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime());
}
break;
case STRING:
Assert.assertTrue("Should be a String", actual instanceof String);
Expand Down Expand Up @@ -241,9 +249,16 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual
Assert.assertEquals("Primitive value should be equal to expected", expectedDays, actual);
break;
case TIMESTAMP:
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
long expectedMicros = ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) expected);
Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual);
Types.TimestampType timestampType = (Types.TimestampType) type;
if (timestampType.shouldAdjustToUTC()) {
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
long expectedMicros = ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) expected);
Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual);
} else {
Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime);
long expectedMicros = ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime) expected).atZone(ZoneId.of("UTC")));
Assert.assertEquals("Primitive value should be equal to expected", expectedMicros, actual);
}
break;
case STRING:
Assert.assertTrue("Should be a UTF8String", actual instanceof UTF8String);
Expand Down
Loading