Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
Expand Down Expand Up @@ -172,15 +171,24 @@ private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecima
return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
}

// Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
// Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef()
// To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses
// the year/month/day to generate the object
private static int daysFromDate(Date date) {
return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime()));
return DateTimeUtil.daysFromDate(date.toLocalDate());
}

// Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp
// Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef()
// To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery
// as it uses the year/month/day/hour/min/sec/nanos to generate the object
private static int daysFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.daysFromInstant(timestamp.toInstant());
return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate());
}

// We have to use the LocalDateTime to get the micros. See the comment above.
private static long microsFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.microsFromInstant(timestamp.toInstant());
return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary This change breaks test:

./gradlew clean :iceberg-mr:test --tests org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory
...
> Task :iceberg-mr:test FAILED

org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory > testTimestampType FAILED
    java.lang.AssertionError: expected:<1349154977123456> but was:<1349129777123456>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at org.junit.Assert.assertEquals(Assert.java:144)
        at org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory.assertPredicatesMatch(TestHiveIcebergFilterFactory.java:268)
        at org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory.testTimestampType(TestHiveIcebergFilterFactory.java:248)

16 tests completed, 1 failed

When run in non-UTC environments. I assume the test may need to change to adjust to the changes being made in #2278 to handle predicate pushdown for Timestamp.withZone().

I'm surprised this is not caught by the CI checks, but maybe the CI runs in UTC - is there a way that we can run the tests in a few additional Timezones to validate?

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even more surprised to see that, since I run my tests in CET and they are running without problem. What timezone are you using? Are you using stock Hive?

Thanks,
Peter

Copy link
Contributor

@edgarRd edgarRd Feb 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in PST, if you replace https://github.com/apache/iceberg/blob/master/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java#L240-L242 for:

    TimeZone defaultTz = TimeZone.getDefault();
    try {
      TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"));
      UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg);
      assertPredicatesMatch(expected, actual);
    } finally {
      TimeZone.setDefault(defaultTz);
    }

to set the TimeZone, you should be able to repro - conversely if I use "UTC" instead of "America/Los_Angeles" the test pass.

I'm running the unit test out of the master branch, with:

./gradlew clean :iceberg-mr:test --tests org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, if I use "CET" it also fails:

org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory > testTimestampType FAILED
    java.lang.AssertionError: expected:<1349154977123456> but was:<1349162177123456>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at org.junit.Assert.assertEquals(Assert.java:144)
        at org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory.assertPredicatesMatch(TestHiveIcebergFilterFactory.java:266)
        at org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory.testTimestampType(TestHiveIcebergFilterFactory.java:246)

16 tests completed, 1 failed

For CET, you can see the difference between the expected value and the actual value is exactly 2 hrs in microseconds (7.2 10^9) - the actual value is ahead of the expected one (which is in UTC)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe intellij made fun of me, or I messed up with my settings. Will definitely check this out soon.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have some time, could you please check that with #2278 the tests run correctly on you side?
In the meantime I try to find out what is the process of reverting changes.

Thanks,
Peter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarRd: Pushed the fix here #2283. Could you please confirm, that this issue is fixed?

Thanks,
Peter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @pvary - test works now.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
Expand Down Expand Up @@ -284,8 +285,11 @@ private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, E
boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);

if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
// Date and timestamp values are not the correct type for Evaluator.
// Wrapping to return the expected type.
InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
} else {
return iter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testBooleanType() {
@Test
public void testDateType() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12));
SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build();

UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.text.DateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestHiveIcebergStorageHandlerTimezone {
private static final Optional<ThreadLocal<DateFormat>> dateFormat =
Optional.ofNullable((ThreadLocal<DateFormat>) DynFields.builder()
.hiddenImpl(TimestampWritable.class, "threadLocalDateFormat")
.defaultAlwaysNull()
.buildStatic()
.get());

private static final Optional<ThreadLocal<TimeZone>> localTimeZone =
Optional.ofNullable((ThreadLocal<TimeZone>) DynFields.builder()
.hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE")
.defaultAlwaysNull()
.buildStatic()
.get());

@Parameters(name = "timezone={0}")
public static Collection<Object[]> parameters() {
return ImmutableList.of(
new String[] {"America/New_York"},
new String[] {"Asia/Kolkata"},
new String[] {"UTC/Greenwich"}
);
}

private static TestHiveShell shell;

private TestTables testTables;

@Parameter(0)
public String timezoneString;

@Rule
public TemporaryFolder temp = new TemporaryFolder();

@BeforeClass
public static void beforeClass() {
shell = HiveIcebergStorageHandlerTestUtils.shell();
}

@AfterClass
public static void afterClass() {
shell.stop();
}

@Before
public void before() throws IOException {
TimeZone.setDefault(TimeZone.getTimeZone(timezoneString));

// Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the
// cached object
dateFormat.ifPresent(ThreadLocal::remove);
localTimeZone.ifPresent(ThreadLocal::remove);

this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
// Uses spark as an engine so we can detect if we unintentionally try to use any execution engines
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark");
}

@After
public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
}

@Test
public void testDateQuery() throws IOException {
Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get()));

List<Record> records = TestHelper.RecordsBuilder.newInstance(dateSchema)
.add(LocalDate.of(2020, 1, 21))
.add(LocalDate.of(2020, 1, 24))
.build();

testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records);

List<Object[]> result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-21", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-21", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-24", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'");
Assert.assertEquals(0, result.size());
}

@Test
public void testTimestampQuery() throws IOException {
Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone()));

List<Record> records = TestHelper.RecordsBuilder.newInstance(timestampSchema)
.add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000))
.add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000))
.build();

testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records);

List<Object[]> result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);

result = shell.executeStatement(
"SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);

result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]);

result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'");
Assert.assertEquals(0, result.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ public void before() throws IOException {

@After
public void after() throws Exception {
shell.closeSession();
shell.metastore().reset();
// HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the
// HiveServer2 is stopped. Only Finalizer closes the HMS connections.
System.gc();
HiveIcebergStorageHandlerTestUtils.close(shell);
// Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false
// at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should
// proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test
Expand Down