diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index a645874ce162..33791c719bbe 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; +import java.time.Instant; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; @@ -171,24 +172,15 @@ private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecima return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale()); } - // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date - // Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef() - // To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses - // the year/month/day to generate the object private static int daysFromDate(Date date) { - return DateTimeUtil.daysFromDate(date.toLocalDate()); + return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime())); } - // Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp - // Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef() - // To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery - // as it uses the year/month/day/hour/min/sec/nanos to generate the object private static int daysFromTimestamp(Timestamp timestamp) { - return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate()); + return DateTimeUtil.daysFromInstant(timestamp.toInstant()); } - // We have to use the LocalDateTime to get the micros. See the comment above. private static long microsFromTimestamp(Timestamp timestamp) { - return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime()); + return DateTimeUtil.microsFromInstant(timestamp.toInstant()); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index a67c5130cb2c..033e5075be1c 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -49,7 +49,6 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.GenericDeleteFilter; import org.apache.iceberg.data.IdentityPartitionConverters; -import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -285,11 +284,8 @@ private CloseableIterable applyResidualFiltering(CloseableIterable iter, E boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false); if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { - // Date and timestamp values are not the correct type for Evaluator. - // Wrapping to return the expected type. - InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct()); Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); - return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record))); + return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record)); } else { return iter; } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java index 3044f0467af3..3d436dcbe323 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java @@ -218,7 +218,7 @@ public void testBooleanType() { @Test public void testDateType() { SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); - Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12)); + Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build(); UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java deleted file mode 100644 index ea6b4d14f89d..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import java.text.DateFormat; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.TimeZone; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.common.DynFields; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.mr.TestHelper; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.types.Types; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class TestHiveIcebergStorageHandlerTimezone { - private static final Optional> dateFormat = - Optional.ofNullable((ThreadLocal) DynFields.builder() - .hiddenImpl(TimestampWritable.class, "threadLocalDateFormat") - .defaultAlwaysNull() - .buildStatic() - .get()); - - private static final Optional> localTimeZone = - Optional.ofNullable((ThreadLocal) DynFields.builder() - .hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE") - .defaultAlwaysNull() - .buildStatic() - .get()); - - @Parameters(name = "timezone={0}") - public static Collection parameters() { - return ImmutableList.of( - new String[] {"America/New_York"}, - new String[] {"Asia/Kolkata"}, - new String[] {"UTC/Greenwich"} - ); - } - - private static TestHiveShell shell; - - private TestTables testTables; - - @Parameter(0) - public String timezoneString; - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @BeforeClass - public static void beforeClass() { - shell = HiveIcebergStorageHandlerTestUtils.shell(); - } - - @AfterClass - public static void afterClass() { - shell.stop(); - } - - @Before - public void before() throws IOException { - TimeZone.setDefault(TimeZone.getTimeZone(timezoneString)); - - // Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the - // cached object - dateFormat.ifPresent(ThreadLocal::remove); - localTimeZone.ifPresent(ThreadLocal::remove); - - this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp); - // Uses spark as an engine so we can detect if we unintentionally try to use any execution engines - HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark"); - } - - @After - public void after() throws Exception { - HiveIcebergStorageHandlerTestUtils.close(shell); - } - - @Test - public void testDateQuery() throws IOException { - Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get())); - - List records = TestHelper.RecordsBuilder.newInstance(dateSchema) - .add(LocalDate.of(2020, 1, 21)) - .add(LocalDate.of(2020, 1, 24)) - .build(); - - testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records); - - List result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2020-01-21", result.get(0)[0]); - - result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2020-01-21", result.get(0)[0]); - - result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2020-01-24", result.get(0)[0]); - - result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'"); - Assert.assertEquals(0, result.size()); - } - - @Test - public void testTimestampQuery() throws IOException { - Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone())); - - List records = TestHelper.RecordsBuilder.newInstance(timestampSchema) - .add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000)) - .add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000)) - .build(); - - testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records); - - List result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]); - - result = shell.executeStatement( - "SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]); - - result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'"); - Assert.assertEquals(1, result.size()); - Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]); - - result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'"); - Assert.assertEquals(0, result.size()); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index fcb4311f8bab..ff120a31bfc2 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -152,7 +152,11 @@ public void before() throws IOException { @After public void after() throws Exception { - HiveIcebergStorageHandlerTestUtils.close(shell); + shell.closeSession(); + shell.metastore().reset(); + // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the + // HiveServer2 is stopped. Only Finalizer closes the HMS connections. + System.gc(); // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false // at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should // proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test