Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.apache.iceberg.hive.legacy;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -29,6 +33,7 @@
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.types.Type;


class HiveExpressions {
Expand Down Expand Up @@ -238,6 +243,7 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
}

private static class ExpressionToPartitionFilterString extends ExpressionVisitors.ExpressionVisitor<String> {
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final ExpressionToPartitionFilterString INSTANCE = new ExpressionToPartitionFilterString();

private ExpressionToPartitionFilterString() {
Expand Down Expand Up @@ -274,26 +280,29 @@ public String or(String leftResult, String rightResult) {

@Override
public <T> String predicate(BoundPredicate<T> pred) {
throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName());
}

@Override
public <T> String predicate(UnboundPredicate<T> pred) {
switch (pred.op()) {
case LT:
case LT_EQ:
case GT:
case GT_EQ:
case EQ:
case NOT_EQ:
return getBinaryExpressionString(pred.ref().name(), pred.op(), pred.literal());
return getBinaryExpressionString(pred);
default:
throw new IllegalStateException("Unexpected operator in Hive partition filter string: " + pred.op());
}
}

private <T> String getBinaryExpressionString(String columnName, Expression.Operation op, Literal<T> lit) {
return String.format("( %s %s %s )", columnName, getOperationString(op), getLiteralValue(lit));
@Override
public <T> String predicate(UnboundPredicate<T> pred) {
throw new IllegalStateException("Unbound predicate not expected: " + pred.getClass().getName());
}

private <T> String getBinaryExpressionString(BoundPredicate<T> pred) {
String columnName = pred.ref().field().name();
String opName = getOperationString(pred.op());
String litValue = getLiteralValue(pred.asLiteralPredicate().literal(), pred.ref().type());
return String.format("( %s %s %s )", columnName, opName, litValue);
}

private String getOperationString(Expression.Operation op) {
Expand All @@ -315,8 +324,17 @@ private String getOperationString(Expression.Operation op) {
}
}

private <T> String getLiteralValue(Literal<T> lit) {
private <T> String getLiteralValue(Literal<T> lit, Type type) {
Object value = lit.value();
switch (type.typeId()) {
case DATE:
value = EPOCH.plus((Integer) value, ChronoUnit.DAYS).toLocalDate().toString();
break;
case TIMESTAMP:
// This format seems to be matching the hive timestamp column partition string literal value
value = EPOCH.plus((Long) value, ChronoUnit.MICROS).toLocalDateTime().toString().replace('T', ' ');
break;
}
if (value instanceof String) {
String escapedString = ((String) value).replace("'", "\\'");
return String.format("'%s'", escapedString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
Expand All @@ -61,6 +62,7 @@ public class LegacyHiveTableOperations extends BaseMetastoreTableOperations {
private final String databaseName;
private final String tableName;
private final Configuration conf;
private Schema schema;

private FileIO fileIO;

Expand All @@ -69,6 +71,7 @@ protected LegacyHiveTableOperations(Configuration conf, HiveClientPool metaClien
this.metaClients = metaClients;
this.databaseName = database;
this.tableName = table;
this.schema = null;
}

@Override
Expand All @@ -86,7 +89,7 @@ protected void doRefresh() {
org.apache.hadoop.hive.metastore.api.Table hiveTable =
metaClients.run(client -> client.getTable(databaseName, tableName));

Schema schema = LegacyHiveTableUtils.getSchema(hiveTable);
this.schema = LegacyHiveTableUtils.getSchema(hiveTable);
PartitionSpec spec = LegacyHiveTableUtils.getPartitionSpec(hiveTable, schema);

Map<String, String> tableProperties = Maps.newHashMap(LegacyHiveTableUtils.getTableProperties(hiveTable));
Expand Down Expand Up @@ -181,7 +184,8 @@ private List<DirectoryInfo> getDirectoryInfosByFilter(Expression expression) {
partitions = metaClients.run(client -> client.listPartitionsByFilter(
databaseName, tableName, null, (short) -1));
} else {
String partitionFilterString = HiveExpressions.toPartitionFilterString(simplified);
Expression boundExpression = Binder.bind(schema.asStruct(), simplified, false);
String partitionFilterString = HiveExpressions.toPartitionFilterString(boundExpression);
LOG.info("Listing partitions for {}.{} with filter string: {}", databaseName, tableName, partitionFilterString);
partitions = metaClients.run(
client -> client.listPartitionsByFilter(databaseName, tableName, partitionFilterString, (short) -1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import static org.apache.iceberg.expressions.Expressions.notNull;
import static org.apache.iceberg.expressions.Expressions.or;
import static org.apache.iceberg.hive.legacy.HiveExpressions.simplifyPartitionFilter;
import static org.apache.iceberg.hive.legacy.HiveExpressions.toPartitionFilterString;


public class TestHiveExpressions {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add tests for DATE and TIMESTAMP (and other datatypes we should support)?

Expand Down Expand Up @@ -118,10 +117,4 @@ public void testSimplifyRemoveNonPartitionColumnsWithinNot2() {
Expression expected = alwaysTrue();
Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString());
}

@Test
public void testToPartitionFilterStringEscapeStringLiterals() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove?

Expression input = equal("pcol", "s'1");
Assert.assertEquals("( pcol = 's\\'1' )", toPartitionFilterString(input));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,7 +39,6 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.avro.AvroSchemaUtil;
Expand Down Expand Up @@ -69,6 +69,11 @@ public class TestLegacyHiveTableScan extends HiveMetastoreTest {
new FieldSchema("pcol", "string", ""),
new FieldSchema("pIntCol", "int", ""));

private static final List<FieldSchema> PARTITION_COLUMNS_2 = ImmutableList.of(
new FieldSchema("pcol", "string", ""),
new FieldSchema("pIntCol", "int", ""),
new FieldSchema("pDateCol", "date", ""));

private static HiveCatalog legacyCatalog;
private static Path dbPath;

Expand Down Expand Up @@ -140,14 +145,26 @@ public void testHiveScanMultiPartitionWithFilter() throws Exception {
hiveScan(table, Expressions.equal("pcol", "ds")));
}

@Test
public void testHiveScanMultiPartitionWithFilter2() throws Exception {
String tableName = "multi_partition_with_filter2";
Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS_2);
addPartition(table, ImmutableList.of("ds", 1, LocalDate.of(2019, 4, 14)), AVRO, "A");
addPartition(table, ImmutableList.of("ds", 1, LocalDate.of(2021, 6, 2)), AVRO, "B");
// 18000 is the # of days since epoch for 2019-04-14,
// this representation matches how Iceberg internally store the value in DateLiteral.
filesMatch(
ImmutableMap.of("pcol=ds/pIntCol=1/pDateCol=2019-04-14/A", AVRO),
hiveScan(table, Expressions.equal("pDateCol", 18000)));
}

@Test
public void testHiveScanNonStringPartitionQuery() throws Exception {
String tableName = "multi_partition_with_filter_on_non_string_partition_cols";
Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS);
AssertHelpers.assertThrows(
"Filtering on non string partition is not supported by ORM layer and we can enable direct sql only on mysql",
RuntimeException.class, "Failed to get partition info",
() -> hiveScan(table, Expressions.and(Expressions.equal("pcol", "ds"), Expressions.equal("pIntCol", "1"))));
filesMatch(
ImmutableMap.of(),
hiveScan(table, Expressions.and(Expressions.equal("pcol", "ds"), Expressions.equal("pIntCol", 1))));
}

@Test
Expand Down