Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
Expand Down Expand Up @@ -172,15 +171,24 @@ private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecima
return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
}

// Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
// Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef()
// To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses
// the year/month/day to generate the object
private static int daysFromDate(Date date) {
return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime()));
return DateTimeUtil.daysFromDate(date.toLocalDate());
}

// Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp
// Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef()
// To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery
// as it uses the year/month/day/hour/min/sec/nanos to generate the object
private static int daysFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.daysFromInstant(timestamp.toInstant());
return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate());
}

// We have to use the LocalDateTime to get the micros. See the comment above.
private static long microsFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.microsFromInstant(timestamp.toInstant());
return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
Expand Down Expand Up @@ -284,8 +285,11 @@ private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, E
boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);

if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
// Date and timestamp values are not the correct type for Evaluator.
// Wrapping to return the expected type.
InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
} else {
return iter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testBooleanType() {
@Test
public void testDateType() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12));
SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build();

UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.text.DateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestHiveIcebergStorageHandlerTimezone {
private static final Optional<ThreadLocal<DateFormat>> dateFormat =
Optional.ofNullable((ThreadLocal<DateFormat>) DynFields.builder()
.hiddenImpl(TimestampWritable.class, "threadLocalDateFormat")
.defaultAlwaysNull()
.buildStatic()
.get());

private static final Optional<ThreadLocal<TimeZone>> localTimeZone =
Optional.ofNullable((ThreadLocal<TimeZone>) DynFields.builder()
.hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE")
.defaultAlwaysNull()
.buildStatic()
.get());

@Parameters(name = "timezone={0}")
public static Collection<Object[]> parameters() {
return ImmutableList.of(
new String[] {"America/New_York"},
new String[] {"Asia/Kolkata"},
new String[] {"UTC/Greenwich"}
);
}

private static TestHiveShell shell;

private TestTables testTables;

@Parameter(0)
public String timezoneString;

@Rule
public TemporaryFolder temp = new TemporaryFolder();

@BeforeClass
public static void beforeClass() {
shell = HiveIcebergStorageHandlerTestUtils.shell();
}

@AfterClass
public static void afterClass() {
shell.stop();
}

@Before
public void before() throws IOException {
TimeZone.setDefault(TimeZone.getTimeZone(timezoneString));

// Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the
// cached object
dateFormat.ifPresent(ThreadLocal::remove);
localTimeZone.ifPresent(ThreadLocal::remove);

this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
// Uses spark as an engine so we can detect if we unintentionally try to use any execution engines
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark");
}

@After
public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
}

@Test
public void testDateQuery() throws IOException {
Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get()));

List<Record> records = TestHelper.RecordsBuilder.newInstance(dateSchema)
.add(LocalDate.of(2020, 1, 21))
.add(LocalDate.of(2020, 1, 24))
.build();

testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records);

List<Object[]> result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-21", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-21", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2020-01-24", result.get(0)[0]);

result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'");
Assert.assertEquals(0, result.size());
}

@Test
public void testTimestampQuery() throws IOException {
Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone()));

List<Record> records = TestHelper.RecordsBuilder.newInstance(timestampSchema)
.add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000))
.add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000))
.build();

testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records);

List<Object[]> result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);

result = shell.executeStatement(
"SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);

result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'");
Assert.assertEquals(1, result.size());
Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]);

result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'");
Assert.assertEquals(0, result.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ public void before() throws IOException {

@After
public void after() throws Exception {
shell.closeSession();
shell.metastore().reset();
// HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the
// HiveServer2 is stopped. Only Finalizer closes the HMS connections.
System.gc();
HiveIcebergStorageHandlerTestUtils.close(shell);
// Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false
// at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should
// proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test
Expand Down