diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index 33791c719bbe..a645874ce162 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -22,7 +22,6 @@ import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; -import java.time.Instant; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; @@ -172,15 +171,24 @@ private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecima return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale()); } + // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date + // Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef() + // To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses + // the year/month/day to generate the object private static int daysFromDate(Date date) { - return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime())); + return DateTimeUtil.daysFromDate(date.toLocalDate()); } + // Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp + // Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef() + // To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery + // as it uses the year/month/day/hour/min/sec/nanos to generate the object private static int daysFromTimestamp(Timestamp timestamp) { - return DateTimeUtil.daysFromInstant(timestamp.toInstant()); + return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate()); } + // We have to use the LocalDateTime to get the micros. See the comment above. private static long microsFromTimestamp(Timestamp timestamp) { - return DateTimeUtil.microsFromInstant(timestamp.toInstant()); + return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime()); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 033e5075be1c..a67c5130cb2c 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -49,6 +49,7 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.GenericDeleteFilter; import org.apache.iceberg.data.IdentityPartitionConverters; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -284,8 +285,11 @@ private CloseableIterable applyResidualFiltering(CloseableIterable iter, E boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false); if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { + // Date and timestamp values are not the correct type for Evaluator. + // Wrapping to return the expected type. + InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct()); Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); - return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record)); + return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record))); } else { return iter; } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java index 3d436dcbe323..3044f0467af3 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java @@ -218,7 +218,7 @@ public void testBooleanType() { @Test public void testDateType() { SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); - Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); + Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12)); SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build(); UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java new file mode 100644 index 000000000000..ea6b4d14f89d --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.text.DateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestHiveIcebergStorageHandlerTimezone { + private static final Optional> dateFormat = + Optional.ofNullable((ThreadLocal) DynFields.builder() + .hiddenImpl(TimestampWritable.class, "threadLocalDateFormat") + .defaultAlwaysNull() + .buildStatic() + .get()); + + private static final Optional> localTimeZone = + Optional.ofNullable((ThreadLocal) DynFields.builder() + .hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE") + .defaultAlwaysNull() + .buildStatic() + .get()); + + @Parameters(name = "timezone={0}") + public static Collection parameters() { + return ImmutableList.of( + new String[] {"America/New_York"}, + new String[] {"Asia/Kolkata"}, + new String[] {"UTC/Greenwich"} + ); + } + + private static TestHiveShell shell; + + private TestTables testTables; + + @Parameter(0) + public String timezoneString; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @BeforeClass + public static void beforeClass() { + shell = HiveIcebergStorageHandlerTestUtils.shell(); + } + + @AfterClass + public static void afterClass() { + shell.stop(); + } + + @Before + public void before() throws IOException { + TimeZone.setDefault(TimeZone.getTimeZone(timezoneString)); + + // Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the + // cached object + dateFormat.ifPresent(ThreadLocal::remove); + localTimeZone.ifPresent(ThreadLocal::remove); + + this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp); + // Uses spark as an engine so we can detect if we unintentionally try to use any execution engines + HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark"); + } + + @After + public void after() throws Exception { + HiveIcebergStorageHandlerTestUtils.close(shell); + } + + @Test + public void testDateQuery() throws IOException { + Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get())); + + List records = TestHelper.RecordsBuilder.newInstance(dateSchema) + .add(LocalDate.of(2020, 1, 21)) + .add(LocalDate.of(2020, 1, 24)) + .build(); + + testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records); + + List result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2020-01-21", result.get(0)[0]); + + result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2020-01-21", result.get(0)[0]); + + result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2020-01-24", result.get(0)[0]); + + result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'"); + Assert.assertEquals(0, result.size()); + } + + @Test + public void testTimestampQuery() throws IOException { + Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone())); + + List records = TestHelper.RecordsBuilder.newInstance(timestampSchema) + .add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000)) + .add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000)) + .build(); + + testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records); + + List result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]); + + result = shell.executeStatement( + "SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]); + + result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]); + + result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'"); + Assert.assertEquals(0, result.size()); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index ff120a31bfc2..fcb4311f8bab 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -152,11 +152,7 @@ public void before() throws IOException { @After public void after() throws Exception { - shell.closeSession(); - shell.metastore().reset(); - // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the - // HiveServer2 is stopped. Only Finalizer closes the HMS connections. - System.gc(); + HiveIcebergStorageHandlerTestUtils.close(shell); // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false // at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should // proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test