Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added data/files/parquet_partition/pcol=100/000000_0
Binary file not shown.
Binary file added data/files/parquet_partition/pcol=200/000000_0
Binary file not shown.
Binary file added data/files/parquet_partition/pcol=300/000000_0
Binary file not shown.
29 changes: 29 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.secrets.URISecretSource;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
Expand Down Expand Up @@ -4273,6 +4274,34 @@ public static void addTableSchemaToConf(Configuration conf,
}
}

/**
* Sets partition column names to the configuration, if there is available info in the operator.
*/
public static void setPartitionColumnNames(Configuration conf, TableScanOperator tableScanOp) {
TableScanDesc scanDesc = tableScanOp.getConf();
Table metadata = scanDesc.getTableMetadata();
if (metadata == null) {
return;
}
List<FieldSchema> partCols = metadata.getPartCols();
if (partCols != null && !partCols.isEmpty()) {
conf.set(serdeConstants.LIST_PARTITION_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(partCols));
}
}

/**
* Returns a list with partition column names present in the configuration,
* or empty if there is no such information available.
*/
public static List<String> getPartitionColumnNames(Configuration conf) {
String colNames = conf.get(serdeConstants.LIST_PARTITION_COLUMNS);
if (colNames != null) {
return splitColNames(new ArrayList<>(), colNames);
} else {
return Collections.emptyList();
}
}

/**
* Create row key and value object inspectors for reduce vectorization.
* The row object inspector used by ReduceWork needs to be a **standard**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc
}

Utilities.addTableSchemaToConf(jobConf, tableScan);
Utilities.setPartitionColumnNames(jobConf, tableScan);

// construct column name list and types for reference by filter push down
Utilities.setColumnNameList(jobConf, tableScan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Strings;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
Expand All @@ -40,6 +41,7 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -196,7 +198,8 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {

// Create the Parquet FilterPredicate without including columns that do not exist
// on the schema (such as partition columns).
FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema, columns);
MessageType newSchema = getSchemaWithoutPartitionColumns(conf, schema);
FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, newSchema, columns);
if (p != null) {
// Filter may have sensitive information. Do not send to debug.
LOG.debug("PARQUET predicate push down generated.");
Expand All @@ -209,6 +212,20 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {
}
}

private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) {
List<String> partCols = Utilities.getPartitionColumnNames(conf);
if (partCols.isEmpty()) {
return schema;
}
List<Type> newFields = new ArrayList<>();
for (Type field : schema.getFields()) {
if (!partCols.contains(field.getName())) {
newFields.add(field);
}
}
return new MessageType(schema.getName(), newFields);
}

public List<BlockMetaData> getFilteredBlocks() {
return filteredBlocks;
}
Expand Down
37 changes: 37 additions & 0 deletions ql/src/test/queries/clientpositive/parquet_partition_col.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- The peculiarity of this test is that the partitioning column exists inside each individual Parquet
-- file (under data/files/parquet_partition) and at the same time it is also present in the directory
-- structure.
--
-- The schema of the Parquet files are shown below:
-- {
-- "type" : "record",
-- "name" : "hive_schema",
-- "fields" : [ {
-- "name" : "strcol",
-- "type" : [ "null", "string" ],
-- "default" : null
-- }, {
-- "name" : "intcol",
-- "type" : [ "null", "int" ],
-- "default" : null
-- }, {
-- "name" : "pcol",
-- "type" : [ "null", "int" ],
-- "default" : null
-- } ]
-- }
-- The test case necessitates the table to be external with location already specified; we don't
-- want the data to be reloaded cause it will change the actual problem.

create external table test(
strcol string,
intcol integer
) partitioned by (pcol int)
stored as parquet
location '../../data/files/parquet_partition';

msck repair table test;

select * from test where pcol=100 and intcol=2;
select * from test where PCOL=200 and intcol=3;
select * from test where `pCol`=300 and intcol=5;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
PREHOOK: query: create external table test(
strcol string,
intcol integer
) partitioned by (pcol int)
stored as parquet
location '../../data/files/parquet_partition'
PREHOOK: type: CREATETABLE
#### A masked pattern was here ####
PREHOOK: Output: database:default
PREHOOK: Output: default@test
POSTHOOK: query: create external table test(
strcol string,
intcol integer
) partitioned by (pcol int)
stored as parquet
location '../../data/files/parquet_partition'
POSTHOOK: type: CREATETABLE
#### A masked pattern was here ####
POSTHOOK: Output: database:default
POSTHOOK: Output: default@test
PREHOOK: query: msck repair table test
PREHOOK: type: MSCK
PREHOOK: Output: default@test
POSTHOOK: query: msck repair table test
POSTHOOK: type: MSCK
POSTHOOK: Output: default@test
Partitions not in metastore: test:pcol=100 test:pcol=200 test:pcol=300
#### A masked pattern was here ####
PREHOOK: query: select * from test where pcol=100 and intcol=2
PREHOOK: type: QUERY
PREHOOK: Input: default@test
PREHOOK: Input: default@test@pcol=100
#### A masked pattern was here ####
POSTHOOK: query: select * from test where pcol=100 and intcol=2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@test
POSTHOOK: Input: default@test@pcol=100
#### A masked pattern was here ####
b 2 100
PREHOOK: query: select * from test where PCOL=200 and intcol=3
PREHOOK: type: QUERY
PREHOOK: Input: default@test
PREHOOK: Input: default@test@pcol=200
#### A masked pattern was here ####
POSTHOOK: query: select * from test where PCOL=200 and intcol=3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@test
POSTHOOK: Input: default@test@pcol=200
#### A masked pattern was here ####
c 3 200
PREHOOK: query: select * from test where `pCol`=300 and intcol=5
PREHOOK: type: QUERY
PREHOOK: Input: default@test
PREHOOK: Input: default@test@pcol=300
#### A masked pattern was here ####
POSTHOOK: query: select * from test where `pCol`=300 and intcol=5
POSTHOOK: type: QUERY
POSTHOOK: Input: default@test
POSTHOOK: Input: default@test@pcol=300
#### A masked pattern was here ####
e 5 300