-
Notifications
You must be signed in to change notification settings - Fork 3k
Add support for reading/writing timestamps without timezone. #2757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
3268799
4b9a190
f5036fb
bac19c6
ab0bf3a
f8a5293
ee386b9
fc6ee0e
e537c0b
abb607e
4bee290
0259262
1579abc
4f37486
bafaffb
7acaec0
cee72a0
459ce89
d14b2b2
398a2a0
3f6b0f2
bc316c4
c5917c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.spark; | ||
|
|
||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.types.FixupTypes; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.types.Types; | ||
|
|
||
| /** | ||
| * By default Spark type {@link org.apache.iceberg.types.Types.TimestampType} should be converted to | ||
| * {@link Types.TimestampType#withZone()} iceberg type. But we also can convert | ||
| * {@link org.apache.iceberg.types.Types.TimestampType} to {@link Types.TimestampType#withoutZone()} iceberg type | ||
| * by setting {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} to 'true' | ||
| */ | ||
| class SparkFixupTimestampType extends FixupTypes { | ||
|
|
||
| private SparkFixupTimestampType(Schema referenceSchema) { | ||
| super(referenceSchema); | ||
| } | ||
|
|
||
| static Schema fixup(Schema schema) { | ||
| return new Schema(TypeUtil.visit(schema, | ||
| new SparkFixupTimestampType(schema)).asStructType().fields()); | ||
| } | ||
|
|
||
| @Override | ||
| public Type primitive(Type.PrimitiveType primitive) { | ||
| if (primitive.typeId() == Type.TypeID.TIMESTAMP) { | ||
| return Types.TimestampType.withoutZone(); | ||
| } | ||
| return primitive; | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { | ||
| return Type.TypeID.TIMESTAMP.equals(type.typeId()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,15 +20,20 @@ | |
| package org.apache.iceberg.spark.data; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Map; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.spark.SparkUtil; | ||
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.types.Types.ListType; | ||
| import org.apache.iceberg.types.Types.LongType; | ||
| import org.apache.iceberg.types.Types.MapType; | ||
| import org.apache.iceberg.types.Types.StructType; | ||
| import org.apache.spark.sql.internal.SQLConf; | ||
| import org.junit.Rule; | ||
| import org.junit.Test; | ||
| import org.junit.rules.TemporaryFolder; | ||
|
|
@@ -185,4 +190,51 @@ public void testMixedTypes() throws IOException { | |
|
|
||
| writeAndValidate(schema); | ||
| } | ||
|
|
||
| @Test | ||
| public void testTimestampWithoutZone() throws IOException { | ||
| withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> { | ||
| Schema schema = TypeUtil.assignIncreasingFreshIds(new Schema( | ||
| required(0, "id", LongType.get()), | ||
| optional(1, "ts_without_zone", Types.TimestampType.withoutZone()))); | ||
|
|
||
| writeAndValidate(schema); | ||
| }); | ||
| } | ||
|
|
||
| protected void withSQLConf(Map<String, String> conf, Action action) throws IOException { | ||
| SQLConf sqlConf = SQLConf.get(); | ||
|
|
||
| Map<String, String> currentConfValues = Maps.newHashMap(); | ||
| conf.keySet().forEach(confKey -> { | ||
| if (sqlConf.contains(confKey)) { | ||
| String currentConfValue = sqlConf.getConfString(confKey); | ||
| currentConfValues.put(confKey, currentConfValue); | ||
| } | ||
| }); | ||
|
|
||
| conf.forEach((confKey, confValue) -> { | ||
| if (SQLConf.staticConfKeys().contains(confKey)) { | ||
| throw new RuntimeException("Cannot modify the value of a static config: " + confKey); | ||
| } | ||
| sqlConf.setConfString(confKey, confValue); | ||
| }); | ||
|
|
||
| try { | ||
| action.invoke(); | ||
| } finally { | ||
| conf.forEach((confKey, confValue) -> { | ||
| if (currentConfValues.containsKey(confKey)) { | ||
| sqlConf.setConfString(confKey, currentConfValues.get(confKey)); | ||
| } else { | ||
| sqlConf.unsetConf(confKey); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| @FunctionalInterface | ||
| protected interface Action { | ||
| void invoke() throws IOException; | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a follow up PR (in a separate PR either before or after this one is merged), particularly if you're looking for some more work to do to contribute to the project, you might explore if this combination of I'm not 100% sure how that would look, maybe an interface like Again, just copying it for now is fine, but it would be nice to reduce the code duplication and make this easier for others to use in the future. Your exploration might find that it’s better to not do that (I’m more of a Scala developer myself and so to me it feels like a mixin). Something to think about for later though!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am fully agree with you, it can be moved to separate interface with static method and placed in some general package like @FunctionalInterface
public interface ConfigurableTestSQLConf {
void invoke() throws IOException;
static void withSQLConf(Map<String, String> conf, ConfigurableTestSQLConf action) throws IOException {
SQLConf sqlConf = SQLConf.get();
Map<String, String> currentConfValues = Maps.newHashMap();
conf.keySet().forEach(confKey -> {
if (sqlConf.contains(confKey)) {
String currentConfValue = sqlConf.getConfString(confKey);
currentConfValues.put(confKey, currentConfValue);
}
});
conf.forEach((confKey, confValue) -> {
if (SQLConf.staticConfKeys().contains(confKey)) {
throw new RuntimeException("Cannot modify the value of a static config: " + confKey);
}
sqlConf.setConfString(confKey, confValue);
});
try {
action.invoke();
} finally {
conf.forEach((confKey, confValue) -> {
if (currentConfValues.containsKey(confKey)) {
sqlConf.setConfString(confKey, currentConfValues.get(confKey));
} else {
sqlConf.unsetConf(confKey);
}
});
}
}
}But this part better to do in separate PR, because other packages will be affected |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think this is overly indented.
When continuing an expression on a second line, we usually indent 4 spaces (as opposed to the normal 2 spaces for changes in scope etc).
So I think that
new SparkFixup…on this line should align the n of new with the second r from return above it (4 spaces in from the start of the wordreturn).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it asap, where can I find code style guide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use IntelliJ, you can have it set up to point to the style guide and auto format it for you: http://iceberg.apache.org/community/#setting-up-ide-and-code-style
Otherwise, in general I believe the rules come from here: https://github.com/apache/iceberg/blob/master/.baseline/idea/intellij-java-palantir-style.xml
There are admittedly a number of these cases, so auto formatting seems like the best idea. If you don’t use IntelliJ, I think this command can be run from command line somehow but I’m not 100% sure.
Thanks for all the work on this so far. It’s very close!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the plug-in that is used in general for your info. Specifically, this one would be the checkstyle one, but we also use error prone as well: https://github.com/palantir/gradle-baseline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, I am using IntelliJ so it is really helpful