From f431f72c0ffab834796b7d3e2f034008c09f866d Mon Sep 17 00:00:00 2001 From: Soumyakanti Das Date: Tue, 8 Nov 2022 16:14:09 -0800 Subject: [PATCH 01/15] Remove partition columns from the parquet schema --- .../files/parquet_partition/pcol=100/000000_0 | Bin 0 -> 761 bytes .../files/parquet_partition/pcol=200/000000_0 | Bin 0 -> 761 bytes .../files/parquet_partition/pcol=300/000000_0 | Bin 0 -> 761 bytes .../io/parquet/ParquetRecordReaderBase.java | 22 ++++- .../clientpositive/parquet_partition_col.q | 15 +++ .../llap/parquet_partition_col.q.out | 87 ++++++++++++++++++ 6 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 data/files/parquet_partition/pcol=100/000000_0 create mode 100644 data/files/parquet_partition/pcol=200/000000_0 create mode 100644 data/files/parquet_partition/pcol=300/000000_0 create mode 100644 ql/src/test/queries/clientpositive/parquet_partition_col.q create mode 100644 ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out diff --git a/data/files/parquet_partition/pcol=100/000000_0 b/data/files/parquet_partition/pcol=100/000000_0 new file mode 100644 index 0000000000000000000000000000000000000000..fe3dc6a5288005e30c3381698bcefa6c1abb6a7f GIT binary patch literal 761 zcmZuvUyIW~5TE?9g!4}Cz+HABLJ_W1@N(&KZ7Brd-i}ggvD#PJBpWY~HZ{$i=%d`z zp&;CgPwFR7Q2ZGB3H%Pe3xczosC|&x3_HJ>-*0DjGda57AwZgF=a(118ZNOhY!C;) z1OUr2&6xUuq0xZ5` za%KqpLSk|XpaEMQ%bBfp*I*@xu-2_$mpWvOG~h0skzu|2W{|sTi;|<`t0(E1e6)&A zCO=<%lvF)ctTFSyJ z)4BAX=L^X;9!~csB1)4q0=849=XoqMZz1NAT|4xA&)=Jlk0#UOdk=t-x+WH-^p-`Q z$z_>F>`EfCl>~OPPAhRQ$`@y{Sf%;G%j9DbJ^NcWT#Dk!sVw*AMYAqxcDxR*W;;Ah zv$)ysb;GzD#r-6f$$>~Zk&I%$-|hOo-y0l;gTXo<%dl=`7feWCgd%LI;Lc8$ZA+mDds|AW#cHoIO)~Dlv{R?EEADwg z5D$Wgp42~}2mge7Q7?iQe}TWjgD;s;3-XeX_r3Rhc`wQM@J@>WsiQsl_5Pyf5eMBW zaRDpWLSap?WB29K+=$SMP#fuyv!W4zK`h71;pvsZFlZXBN2*jj4wu zvN%_npT~3MpQQ`MHttRK#-WI#SOD8D!}CFVe-S%2si@@DueQ6p#O=4VGc{@I>Xi^Q>N`)LVWF zN4+^ai4$4xcG@%97P1>j743&nODG|OZo3@>L8o^x>-F|$Q7GjN-pBVX2GsCr#`x)f E10Z~j&j0`b literal 0 HcmV?d00001 diff --git a/data/files/parquet_partition/pcol=300/000000_0 b/data/files/parquet_partition/pcol=300/000000_0 new file mode 100644 index 0000000000000000000000000000000000000000..a16616e8d3ae23e0d7049fe0d98f6840f30f1beb GIT binary patch literal 761 zcmZWn&ui2`6n^>95Na(gIKvD?2tti4nKZzJ9rUw*6n)24)yIk#3@W`0?|b;S-hU0Q<;B?he|;yEm`uBlPtn zK;yeAXNtfX6O~hd8qlREXL_x>21`MNYuyI6s7WSB18&hd8MV953b`$}C^NE)@d4&*i(9xH2OLcB>zhhcti+d&*{11;{h)(FF zWBPsQDlv9|1z@Yl2B0rAa*e)J$qnetEv%vCHV4$)5gm2re$ybUyQR>r+ikpcwYO63 zqe>j;yjL1I9ZRW8ti@8AVg%LXy)>&-`{x1Q$Jf!ZHY`wnfPT^lrI%8aK3Ay@e9U$p zmq{f`r%E#6JS}p;Ht)}FO@lCrlMvW$8_tSIq)r}Wf^8o;uHzoeCdbp+$(?(^NShM{ zm2j42k&0!Nglr=Y(v<*qy$wroCoJ-FQLd6AcT({%2%r2-8!dzK(V3_YveIr-?4HxZ zVRsj2NgCOMzPE_HFdD>>h!2Cf7m6@)2cGA;Zhv^R7!D5?aS%l}xsUJL47iGYPw}V! E4J0>;R{#J2 literal 0 HcmV?d00001 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index a665c2586a34..ec5c5fd524f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -40,14 +40,18 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public abstract class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class); @@ -196,7 +200,8 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { // Create the Parquet FilterPredicate without including columns that do not exist // on the schema (such as partition columns). - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema, columns); + MessageType newSchema = getSchemaWithoutPartitionAndVirtualColumns(conf, names, schema); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, newSchema, columns); if (p != null) { // Filter may have sensitive information. Do not send to debug. LOG.debug("PARQUET predicate push down generated."); @@ -209,6 +214,21 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { } } + private MessageType getSchemaWithoutPartitionAndVirtualColumns(JobConf conf, String[] names, MessageType schema) { + String schemaEvolutionColumnNames = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + Set partitionAndVirtualColumns = new HashSet<>(Arrays.asList(names)); + partitionAndVirtualColumns.removeAll(new HashSet<>(Arrays.asList(schemaEvolutionColumnNames.split(",")))); + + List newFields = new ArrayList<>(); + for (Type field: schema.getFields()) { + if(!partitionAndVirtualColumns.contains(field.getName())) { + newFields.add(field); + } + } + + return new MessageType(schema.getName(), newFields); + } + public List getFilteredBlocks() { return filteredBlocks; } diff --git a/ql/src/test/queries/clientpositive/parquet_partition_col.q b/ql/src/test/queries/clientpositive/parquet_partition_col.q new file mode 100644 index 000000000000..5ae8d2fb02ea --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_partition_col.q @@ -0,0 +1,15 @@ + + create external table test( + strcol string, + intcol integer + ) partitioned by (pcol int) + stored as parquet + location '../../data/files/parquet_partition'; + + msck repair table test; + + select * from test; + + explain select * from test where pcol=100 and intcol=2; + + select * from test where pcol=100 and intcol=2; diff --git a/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out new file mode 100644 index 000000000000..b669625c23d8 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out @@ -0,0 +1,87 @@ +PREHOOK: query: create external table test( + strcol string, + intcol integer + ) partitioned by (pcol int) + stored as parquet + location '../../data/files/parquet_partition' +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@test +POSTHOOK: query: create external table test( + strcol string, + intcol integer + ) partitioned by (pcol int) + stored as parquet + location '../../data/files/parquet_partition' +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test +PREHOOK: query: msck repair table test +PREHOOK: type: MSCK +PREHOOK: Output: default@test +POSTHOOK: query: msck repair table test +POSTHOOK: type: MSCK +POSTHOOK: Output: default@test +Partitions not in metastore: test:pcol=100 test:pcol=200 test:pcol=300 +#### A masked pattern was here #### +PREHOOK: query: select * from test +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=100 +PREHOOK: Input: default@test@pcol=200 +PREHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +POSTHOOK: query: select * from test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=100 +POSTHOOK: Input: default@test@pcol=200 +POSTHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +a 1 100 +b 2 100 +c 3 200 +d 4 200 +e 5 300 +f 6 300 +PREHOOK: query: explain select * from test where pcol=100 and intcol=2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=100 +#### A masked pattern was here #### +POSTHOOK: query: explain select * from test where pcol=100 and intcol=2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=100 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: test + filterExpr: ((intcol = 2) and (pcol = 100)) (type: boolean) + Filter Operator + predicate: (intcol = 2) (type: boolean) + Select Operator + expressions: strcol (type: string), 2 (type: int), 100 (type: int) + outputColumnNames: _col0, _col1, _col2 + ListSink + +PREHOOK: query: select * from test where pcol=100 and intcol=2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=100 +#### A masked pattern was here #### +POSTHOOK: query: select * from test where pcol=100 and intcol=2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=100 +#### A masked pattern was here #### +b 2 100 From 22ecffb6b69b46a4fec63a1edaad617b6d00a144 Mon Sep 17 00:00:00 2001 From: Soumyakanti Das Date: Tue, 8 Nov 2022 21:03:53 -0800 Subject: [PATCH 02/15] Add PARTITION_COLUMNS to IOConstants, to add partition column info to the conf from TableScanOp --- .../hadoop/hive/ql/exec/FetchOperator.java | 1 + .../apache/hadoop/hive/ql/exec/Utilities.java | 12 ++++++++++ .../hadoop/hive/ql/io/HiveInputFormat.java | 2 ++ .../apache/hadoop/hive/ql/io/IOConstants.java | 1 + .../io/parquet/ParquetRecordReaderBase.java | 23 ++++++++++--------- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index c3fc4094504b..b4eded614042 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,6 +143,7 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); + Utilities.setPartitionColumnsToConf(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0eb568614e5d..2f16b7f26389 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4272,6 +4272,18 @@ public static void addTableSchemaToConf(Configuration conf, LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); } } + public static void setPartitionColumnsToConf(Configuration conf, TableScanOperator tableScanOp) { + List partitionColsList = tableScanOp.getConf().getTableMetadata().getPartColNames(); + if (partitionColsList.size() > 0) { + conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); + } else { + LOG.info(IOConstants.PARTITION_COLUMNS + " not available"); + } + } + + public static void unsetPartitionColumnsInConf(Configuration conf) { + conf.unset(IOConstants.PARTITION_COLUMNS); + } /** * Create row key and value object inspectors for reduce vectorization. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6f38d680a867..82ba0ea26264 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -911,6 +911,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); Utilities.unsetSchemaEvolution(jobConf); + Utilities.unsetPartitionColumnsInConf(jobConf); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { @@ -918,6 +919,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc } Utilities.addTableSchemaToConf(jobConf, tableScan); + Utilities.setPartitionColumnsToConf(jobConf, tableScan); // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 2be864e752e2..a9f41260a99b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -47,6 +47,7 @@ public final class IOConstants { */ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; + public static final String PARTITION_COLUMNS = "partition.columns"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index ec5c5fd524f8..e8f88c88938c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -200,7 +200,7 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { // Create the Parquet FilterPredicate without including columns that do not exist // on the schema (such as partition columns). - MessageType newSchema = getSchemaWithoutPartitionAndVirtualColumns(conf, names, schema); + MessageType newSchema = getSchemaWithoutPartitionColumns(conf, schema); FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, newSchema, columns); if (p != null) { // Filter may have sensitive information. Do not send to debug. @@ -214,19 +214,20 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { } } - private MessageType getSchemaWithoutPartitionAndVirtualColumns(JobConf conf, String[] names, MessageType schema) { - String schemaEvolutionColumnNames = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); - Set partitionAndVirtualColumns = new HashSet<>(Arrays.asList(names)); - partitionAndVirtualColumns.removeAll(new HashSet<>(Arrays.asList(schemaEvolutionColumnNames.split(",")))); + private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) { + String partCols = conf.get(IOConstants.PARTITION_COLUMNS); + if (partCols != null && partCols.length() > 0) { + Set partitionColumns = new HashSet<>(Arrays.asList(partCols.split(","))); + List newFields = new ArrayList<>(); - List newFields = new ArrayList<>(); - for (Type field: schema.getFields()) { - if(!partitionAndVirtualColumns.contains(field.getName())) { - newFields.add(field); + for (Type field: schema.getFields()) { + if(!partitionColumns.contains(field.getName())) { + newFields.add(field); + } } + return new MessageType(schema.getName(), newFields); } - - return new MessageType(schema.getName(), newFields); + return schema; } public List getFilteredBlocks() { From 5a901cab90d8b287fedda40fb002a5ccf623934a Mon Sep 17 00:00:00 2001 From: Soumyakanti Das Date: Wed, 9 Nov 2022 01:38:51 -0800 Subject: [PATCH 03/15] Check nulls for TableMetadata, resolve List.size() code smell --- .../java/org/apache/hadoop/hive/ql/exec/Utilities.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2f16b7f26389..62fd7e4c0c40 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.secrets.URISecretSource; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; @@ -4273,9 +4274,12 @@ public static void addTableSchemaToConf(Configuration conf, } } public static void setPartitionColumnsToConf(Configuration conf, TableScanOperator tableScanOp) { - List partitionColsList = tableScanOp.getConf().getTableMetadata().getPartColNames(); - if (partitionColsList.size() > 0) { - conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); + TableScanDesc scanDesc = tableScanOp.getConf(); + if (scanDesc != null && scanDesc.getTableMetadata() != null) { + List partitionColsList = scanDesc.getTableMetadata().getPartColNames(); + if (!partitionColsList.isEmpty()) { + conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); + } } else { LOG.info(IOConstants.PARTITION_COLUMNS + " not available"); } From c6b94cc7da2ad0b65d3a8747fb57650bce46a16c Mon Sep 17 00:00:00 2001 From: Soumyakanti Das Date: Thu, 10 Nov 2022 10:25:50 -0800 Subject: [PATCH 04/15] Address review comments --- .../hadoop/hive/ql/exec/FetchOperator.java | 2 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 18 ++++- .../hadoop/hive/ql/io/HiveInputFormat.java | 2 +- .../io/parquet/ParquetRecordReaderBase.java | 2 +- .../clientpositive/parquet_partition_col.q | 7 +- .../llap/parquet_partition_col.q.out | 78 +++++++++++++++++++ 6 files changed, 102 insertions(+), 7 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index b4eded614042..0ba0faef91a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,7 +143,7 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); - Utilities.setPartitionColumnsToConf(job, (TableScanOperator) operator); + Utilities.setPartitionColumnsInConf(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 62fd7e4c0c40..2088c5a97472 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4273,18 +4273,30 @@ public static void addTableSchemaToConf(Configuration conf, LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); } } - public static void setPartitionColumnsToConf(Configuration conf, TableScanOperator tableScanOp) { + + /** + * Sets partition column names to {@link IOConstants#PARTITION_COLUMNS}, if available. + * + * @param conf JobConf + * @param tableScanOp TableScanOperator object + */ + public static void setPartitionColumnsInConf(Configuration conf, TableScanOperator tableScanOp) { TableScanDesc scanDesc = tableScanOp.getConf(); if (scanDesc != null && scanDesc.getTableMetadata() != null) { List partitionColsList = scanDesc.getTableMetadata().getPartColNames(); if (!partitionColsList.isEmpty()) { conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); + } else { + LOG.info(IOConstants.PARTITION_COLUMNS + " not available"); } - } else { - LOG.info(IOConstants.PARTITION_COLUMNS + " not available"); } } + /** + * Unsets partition column names from {@link IOConstants#PARTITION_COLUMNS} + * + * @param conf JobConf + */ public static void unsetPartitionColumnsInConf(Configuration conf) { conf.unset(IOConstants.PARTITION_COLUMNS); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 82ba0ea26264..db707f8da3a5 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -919,7 +919,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc } Utilities.addTableSchemaToConf(jobConf, tableScan); - Utilities.setPartitionColumnsToConf(jobConf, tableScan); + Utilities.setPartitionColumnsInConf(jobConf, tableScan); // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index e8f88c88938c..2f8efa88fd44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -216,7 +216,7 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) { String partCols = conf.get(IOConstants.PARTITION_COLUMNS); - if (partCols != null && partCols.length() > 0) { + if (partCols != null && !partCols.isEmpty()) { Set partitionColumns = new HashSet<>(Arrays.asList(partCols.split(","))); List newFields = new ArrayList<>(); diff --git a/ql/src/test/queries/clientpositive/parquet_partition_col.q b/ql/src/test/queries/clientpositive/parquet_partition_col.q index 5ae8d2fb02ea..33fc3e4bc1b1 100644 --- a/ql/src/test/queries/clientpositive/parquet_partition_col.q +++ b/ql/src/test/queries/clientpositive/parquet_partition_col.q @@ -11,5 +11,10 @@ select * from test; explain select * from test where pcol=100 and intcol=2; - select * from test where pcol=100 and intcol=2; + + explain select * from test where PCOL=200 and intcol=3; + select * from test where PCOL=200 and intcol=3; + + explain select * from test where `pCol`=300 and intcol=5; + select * from test where `pCol`=300 and intcol=5; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out index b669625c23d8..79ab0d177c23 100644 --- a/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out +++ b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out @@ -85,3 +85,81 @@ POSTHOOK: Input: default@test POSTHOOK: Input: default@test@pcol=100 #### A masked pattern was here #### b 2 100 +PREHOOK: query: explain select * from test where PCOL=200 and intcol=3 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=200 +#### A masked pattern was here #### +POSTHOOK: query: explain select * from test where PCOL=200 and intcol=3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=200 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: test + filterExpr: ((intcol = 3) and (pcol = 200)) (type: boolean) + Filter Operator + predicate: (intcol = 3) (type: boolean) + Select Operator + expressions: strcol (type: string), 3 (type: int), 200 (type: int) + outputColumnNames: _col0, _col1, _col2 + ListSink + +PREHOOK: query: select * from test where PCOL=200 and intcol=3 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=200 +#### A masked pattern was here #### +POSTHOOK: query: select * from test where PCOL=200 and intcol=3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=200 +#### A masked pattern was here #### +c 3 200 +PREHOOK: query: explain select * from test where `pCol`=300 and intcol=5 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +POSTHOOK: query: explain select * from test where `pCol`=300 and intcol=5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: test + filterExpr: ((intcol = 5) and (pcol = 300)) (type: boolean) + Filter Operator + predicate: (intcol = 5) (type: boolean) + Select Operator + expressions: strcol (type: string), 5 (type: int), 300 (type: int) + outputColumnNames: _col0, _col1, _col2 + ListSink + +PREHOOK: query: select * from test where `pCol`=300 and intcol=5 +PREHOOK: type: QUERY +PREHOOK: Input: default@test +PREHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +POSTHOOK: query: select * from test where `pCol`=300 and intcol=5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test +POSTHOOK: Input: default@test@pcol=300 +#### A masked pattern was here #### +e 5 300 From 5f766f678786b84b27aad23374b91d82c6fe2409 Mon Sep 17 00:00:00 2001 From: Soumyakanti Das Date: Thu, 10 Nov 2022 16:27:24 -0800 Subject: [PATCH 05/15] Change log to debug --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2088c5a97472..2885c96cb2c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4287,7 +4287,9 @@ public static void setPartitionColumnsInConf(Configuration conf, TableScanOperat if (!partitionColsList.isEmpty()) { conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); } else { - LOG.info(IOConstants.PARTITION_COLUMNS + " not available"); + if (LOG.isDebugEnabled()) { + LOG.debug(IOConstants.PARTITION_COLUMNS + " not available"); + } } } } From d1908ce0feb168386a978359301557beac64b884 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 14 Nov 2022 15:48:55 +0100 Subject: [PATCH 06/15] Revert fix based on new config properties --- .../hadoop/hive/ql/exec/FetchOperator.java | 1 - .../apache/hadoop/hive/ql/exec/Utilities.java | 30 ------------------- .../hadoop/hive/ql/io/HiveInputFormat.java | 2 -- .../apache/hadoop/hive/ql/io/IOConstants.java | 1 - .../io/parquet/ParquetRecordReaderBase.java | 23 +------------- 5 files changed, 1 insertion(+), 56 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0ba0faef91a7..c3fc4094504b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,7 +143,6 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); - Utilities.setPartitionColumnsInConf(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2885c96cb2c1..0eb568614e5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -161,7 +161,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.secrets.URISecretSource; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; @@ -4274,35 +4273,6 @@ public static void addTableSchemaToConf(Configuration conf, } } - /** - * Sets partition column names to {@link IOConstants#PARTITION_COLUMNS}, if available. - * - * @param conf JobConf - * @param tableScanOp TableScanOperator object - */ - public static void setPartitionColumnsInConf(Configuration conf, TableScanOperator tableScanOp) { - TableScanDesc scanDesc = tableScanOp.getConf(); - if (scanDesc != null && scanDesc.getTableMetadata() != null) { - List partitionColsList = scanDesc.getTableMetadata().getPartColNames(); - if (!partitionColsList.isEmpty()) { - conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(IOConstants.PARTITION_COLUMNS + " not available"); - } - } - } - } - - /** - * Unsets partition column names from {@link IOConstants#PARTITION_COLUMNS} - * - * @param conf JobConf - */ - public static void unsetPartitionColumnsInConf(Configuration conf) { - conf.unset(IOConstants.PARTITION_COLUMNS); - } - /** * Create row key and value object inspectors for reduce vectorization. * The row object inspector used by ReduceWork needs to be a **standard** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index db707f8da3a5..6f38d680a867 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -911,7 +911,6 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); Utilities.unsetSchemaEvolution(jobConf); - Utilities.unsetPartitionColumnsInConf(jobConf); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { @@ -919,7 +918,6 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc } Utilities.addTableSchemaToConf(jobConf, tableScan); - Utilities.setPartitionColumnsInConf(jobConf, tableScan); // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index a9f41260a99b..2be864e752e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -47,7 +47,6 @@ public final class IOConstants { */ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; - public static final String PARTITION_COLUMNS = "partition.columns"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 2f8efa88fd44..a665c2586a34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -40,18 +40,14 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; -import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; public abstract class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class); @@ -200,8 +196,7 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { // Create the Parquet FilterPredicate without including columns that do not exist // on the schema (such as partition columns). - MessageType newSchema = getSchemaWithoutPartitionColumns(conf, schema); - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, newSchema, columns); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema, columns); if (p != null) { // Filter may have sensitive information. Do not send to debug. LOG.debug("PARQUET predicate push down generated."); @@ -214,22 +209,6 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { } } - private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) { - String partCols = conf.get(IOConstants.PARTITION_COLUMNS); - if (partCols != null && !partCols.isEmpty()) { - Set partitionColumns = new HashSet<>(Arrays.asList(partCols.split(","))); - List newFields = new ArrayList<>(); - - for (Type field: schema.getFields()) { - if(!partitionColumns.contains(field.getName())) { - newFields.add(field); - } - } - return new MessageType(schema.getName(), newFields); - } - return schema; - } - public List getFilteredBlocks() { return filteredBlocks; } From 7e16714b28fb3611bff1d9f35be79c400197055b Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 14 Nov 2022 15:44:04 +0100 Subject: [PATCH 07/15] Do not create filters on pruned schema columns Parquet supports column pruning and this information is captured by ReadContext#getRequestedSchema. Creating and applying filters on columns that are not present in the requested Parquet schema can lead to wrong results since missing columns are populated with null values. Align predicate push-down and column pruning optimizations to use the same schema ("requestedSchema") to avoid evaluating predicates on nulls. --- .../hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index a665c2586a34..3f559d6d5370 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -134,9 +134,9 @@ protected ParquetInputSplit getSplit( return null; } - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + FilterCompat.Filter filter = setFilter(jobConf, readContext.getRequestedSchema()); if (filter != null) { - filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, readContext.getRequestedSchema()); if (filteredBlocks.isEmpty()) { LOG.debug("All row groups are dropped due to filter predicates"); return null; From 8ea38f5aa8a061ede6b3a6e1809022cbe66b034c Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Thu, 17 Nov 2022 15:48:00 +0100 Subject: [PATCH 08/15] Revert "Do not create filters on pruned schema columns" This reverts commit 7e16714b28fb3611bff1d9f35be79c400197055b. The approach caused various failures especially to tests with schema evolutions so as explained in the JIRA cannot be used. --- .../hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 3f559d6d5370..a665c2586a34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -134,9 +134,9 @@ protected ParquetInputSplit getSplit( return null; } - FilterCompat.Filter filter = setFilter(jobConf, readContext.getRequestedSchema()); + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); if (filter != null) { - filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, readContext.getRequestedSchema()); + filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); if (filteredBlocks.isEmpty()) { LOG.debug("All row groups are dropped due to filter predicates"); return null; From 736d5cdff2f7ae56f4f0b1ee6b9fa017311c0797 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 16:29:52 +0100 Subject: [PATCH 09/15] Revert "Revert fix based on new config properties" This reverts commit d1908ce0feb168386a978359301557beac64b884. --- .../hadoop/hive/ql/exec/FetchOperator.java | 1 + .../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++++++++++++++++ .../hadoop/hive/ql/io/HiveInputFormat.java | 2 ++ .../apache/hadoop/hive/ql/io/IOConstants.java | 1 + .../io/parquet/ParquetRecordReaderBase.java | 23 +++++++++++++- 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index c3fc4094504b..0ba0faef91a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,6 +143,7 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); + Utilities.setPartitionColumnsInConf(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0eb568614e5d..2885c96cb2c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.secrets.URISecretSource; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; @@ -4273,6 +4274,35 @@ public static void addTableSchemaToConf(Configuration conf, } } + /** + * Sets partition column names to {@link IOConstants#PARTITION_COLUMNS}, if available. + * + * @param conf JobConf + * @param tableScanOp TableScanOperator object + */ + public static void setPartitionColumnsInConf(Configuration conf, TableScanOperator tableScanOp) { + TableScanDesc scanDesc = tableScanOp.getConf(); + if (scanDesc != null && scanDesc.getTableMetadata() != null) { + List partitionColsList = scanDesc.getTableMetadata().getPartColNames(); + if (!partitionColsList.isEmpty()) { + conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(IOConstants.PARTITION_COLUMNS + " not available"); + } + } + } + } + + /** + * Unsets partition column names from {@link IOConstants#PARTITION_COLUMNS} + * + * @param conf JobConf + */ + public static void unsetPartitionColumnsInConf(Configuration conf) { + conf.unset(IOConstants.PARTITION_COLUMNS); + } + /** * Create row key and value object inspectors for reduce vectorization. * The row object inspector used by ReduceWork needs to be a **standard** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6f38d680a867..db707f8da3a5 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -911,6 +911,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); Utilities.unsetSchemaEvolution(jobConf); + Utilities.unsetPartitionColumnsInConf(jobConf); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { @@ -918,6 +919,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc } Utilities.addTableSchemaToConf(jobConf, tableScan); + Utilities.setPartitionColumnsInConf(jobConf, tableScan); // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 2be864e752e2..a9f41260a99b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -47,6 +47,7 @@ public final class IOConstants { */ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; + public static final String PARTITION_COLUMNS = "partition.columns"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index a665c2586a34..2f8efa88fd44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -40,14 +40,18 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public abstract class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class); @@ -196,7 +200,8 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { // Create the Parquet FilterPredicate without including columns that do not exist // on the schema (such as partition columns). - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema, columns); + MessageType newSchema = getSchemaWithoutPartitionColumns(conf, schema); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, newSchema, columns); if (p != null) { // Filter may have sensitive information. Do not send to debug. LOG.debug("PARQUET predicate push down generated."); @@ -209,6 +214,22 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { } } + private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) { + String partCols = conf.get(IOConstants.PARTITION_COLUMNS); + if (partCols != null && !partCols.isEmpty()) { + Set partitionColumns = new HashSet<>(Arrays.asList(partCols.split(","))); + List newFields = new ArrayList<>(); + + for (Type field: schema.getFields()) { + if(!partitionColumns.contains(field.getName())) { + newFields.add(field); + } + } + return new MessageType(schema.getName(), newFields); + } + return schema; + } + public List getFilteredBlocks() { return filteredBlocks; } From 45d02f8dc5325be44897a5c5dc750006029865f1 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:02:20 +0100 Subject: [PATCH 10/15] Set/Get "partition.column" using existing variable in serdeConstants --- .../apache/hadoop/hive/ql/exec/Utilities.java | 35 +++++++++---------- .../hadoop/hive/ql/io/HiveInputFormat.java | 1 - .../apache/hadoop/hive/ql/io/IOConstants.java | 1 - .../io/parquet/ParquetRecordReaderBase.java | 21 ++++++----- 4 files changed, 27 insertions(+), 31 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2885c96cb2c1..4743bd47d420 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4275,32 +4275,31 @@ public static void addTableSchemaToConf(Configuration conf, } /** - * Sets partition column names to {@link IOConstants#PARTITION_COLUMNS}, if available. - * - * @param conf JobConf - * @param tableScanOp TableScanOperator object + * Sets partition column names to the configuration, if there is available info in the operator. */ public static void setPartitionColumnsInConf(Configuration conf, TableScanOperator tableScanOp) { TableScanDesc scanDesc = tableScanOp.getConf(); - if (scanDesc != null && scanDesc.getTableMetadata() != null) { - List partitionColsList = scanDesc.getTableMetadata().getPartColNames(); - if (!partitionColsList.isEmpty()) { - conf.set(IOConstants.PARTITION_COLUMNS, String.join(",", partitionColsList)); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(IOConstants.PARTITION_COLUMNS + " not available"); - } - } + Table metadata = scanDesc.getTableMetadata(); + if (metadata == null) { + return; + } + List partCols = metadata.getPartCols(); + if (partCols != null && !partCols.isEmpty()) { + conf.set(serdeConstants.LIST_PARTITION_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(partCols)); } } /** - * Unsets partition column names from {@link IOConstants#PARTITION_COLUMNS} - * - * @param conf JobConf + * Returns a list with partition column names present in the configuration, + * or empty if there is no such information available. */ - public static void unsetPartitionColumnsInConf(Configuration conf) { - conf.unset(IOConstants.PARTITION_COLUMNS); + public static List getPartitionColumnNames(Configuration conf) { + String colNames = conf.get(serdeConstants.LIST_PARTITION_COLUMNS); + if (colNames != null) { + return splitColNames(new ArrayList<>(), colNames); + } else { + return Collections.emptyList(); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index db707f8da3a5..00d359049387 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -911,7 +911,6 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); Utilities.unsetSchemaEvolution(jobConf); - Utilities.unsetPartitionColumnsInConf(jobConf); TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index a9f41260a99b..2be864e752e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -47,7 +47,6 @@ public final class IOConstants { */ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; - public static final String PARTITION_COLUMNS = "partition.columns"; @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 2f8efa88fd44..a9e907b96f95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -16,6 +16,7 @@ import com.google.common.base.Strings; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; @@ -215,19 +216,17 @@ public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { } private MessageType getSchemaWithoutPartitionColumns(JobConf conf, MessageType schema) { - String partCols = conf.get(IOConstants.PARTITION_COLUMNS); - if (partCols != null && !partCols.isEmpty()) { - Set partitionColumns = new HashSet<>(Arrays.asList(partCols.split(","))); - List newFields = new ArrayList<>(); - - for (Type field: schema.getFields()) { - if(!partitionColumns.contains(field.getName())) { - newFields.add(field); - } + List partCols = Utilities.getPartitionColumnNames(conf); + if (partCols.isEmpty()) { + return schema; + } + List newFields = new ArrayList<>(); + for (Type field : schema.getFields()) { + if (!partCols.contains(field.getName())) { + newFields.add(field); } - return new MessageType(schema.getName(), newFields); } - return schema; + return new MessageType(schema.getName(), newFields); } public List getFilteredBlocks() { From 880dbf57aa4269c6a6543326944ac03aa4c51133 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:05:45 +0100 Subject: [PATCH 11/15] Rename to setPartitionColumnNames to align with existing APIs Various existing APIs: setColumnNameList setColumnTypeList getColumnNames --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java | 2 +- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 2 +- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0ba0faef91a7..063a587c8a33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,7 +143,7 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); - Utilities.setPartitionColumnsInConf(job, (TableScanOperator) operator); + Utilities.setPartitionColumnNames(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4743bd47d420..afd2c48ad6f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4277,7 +4277,7 @@ public static void addTableSchemaToConf(Configuration conf, /** * Sets partition column names to the configuration, if there is available info in the operator. */ - public static void setPartitionColumnsInConf(Configuration conf, TableScanOperator tableScanOp) { + public static void setPartitionColumnNames(Configuration conf, TableScanOperator tableScanOp) { TableScanDesc scanDesc = tableScanOp.getConf(); Table metadata = scanDesc.getTableMetadata(); if (metadata == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 00d359049387..de93573e303a 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -918,7 +918,7 @@ public static void pushFiltersAndAsOf(JobConf jobConf, TableScanOperator tableSc } Utilities.addTableSchemaToConf(jobConf, tableScan); - Utilities.setPartitionColumnsInConf(jobConf, tableScan); + Utilities.setPartitionColumnNames(jobConf, tableScan); // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); From c310060df7309df3949cbc4345ab9050812e0b98 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:10:27 +0100 Subject: [PATCH 12/15] Drop seemingly redundant call from FetchOperator FetchTask#initFetch already sets the partition columns among other things. Column name, types, etc, are not set in the constructor so setting partitions here seems a bit out of place. --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 063a587c8a33..c3fc4094504b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -143,7 +143,6 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, if (operator instanceof TableScanOperator) { Utilities.addTableSchemaToConf(job, (TableScanOperator) operator); - Utilities.setPartitionColumnNames(job, (TableScanOperator) operator); } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); From 2cb2d99e0dd6bc83cf13c0e649157cfd9e603929 Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:21:06 +0100 Subject: [PATCH 13/15] Remove explains from tests since the changes are not targeting the query plan --- .../clientpositive/parquet_partition_col.q | 32 ++--- .../llap/parquet_partition_col.q.out | 124 ++---------------- 2 files changed, 22 insertions(+), 134 deletions(-) diff --git a/ql/src/test/queries/clientpositive/parquet_partition_col.q b/ql/src/test/queries/clientpositive/parquet_partition_col.q index 33fc3e4bc1b1..695daa12f99a 100644 --- a/ql/src/test/queries/clientpositive/parquet_partition_col.q +++ b/ql/src/test/queries/clientpositive/parquet_partition_col.q @@ -1,20 +1,12 @@ - - create external table test( - strcol string, - intcol integer - ) partitioned by (pcol int) - stored as parquet - location '../../data/files/parquet_partition'; - - msck repair table test; - - select * from test; - - explain select * from test where pcol=100 and intcol=2; - select * from test where pcol=100 and intcol=2; - - explain select * from test where PCOL=200 and intcol=3; - select * from test where PCOL=200 and intcol=3; - - explain select * from test where `pCol`=300 and intcol=5; - select * from test where `pCol`=300 and intcol=5; \ No newline at end of file +create external table test( + strcol string, + intcol integer +) partitioned by (pcol int) +stored as parquet +location '../../data/files/parquet_partition'; + +msck repair table test; + +select * from test where pcol=100 and intcol=2; +select * from test where PCOL=200 and intcol=3; +select * from test where `pCol`=300 and intcol=5; diff --git a/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out index 79ab0d177c23..1583792dbb5a 100644 --- a/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out +++ b/ql/src/test/results/clientpositive/llap/parquet_partition_col.q.out @@ -1,19 +1,19 @@ PREHOOK: query: create external table test( - strcol string, - intcol integer - ) partitioned by (pcol int) - stored as parquet - location '../../data/files/parquet_partition' + strcol string, + intcol integer +) partitioned by (pcol int) +stored as parquet +location '../../data/files/parquet_partition' PREHOOK: type: CREATETABLE #### A masked pattern was here #### PREHOOK: Output: database:default PREHOOK: Output: default@test POSTHOOK: query: create external table test( - strcol string, - intcol integer - ) partitioned by (pcol int) - stored as parquet - location '../../data/files/parquet_partition' + strcol string, + intcol integer +) partitioned by (pcol int) +stored as parquet +location '../../data/files/parquet_partition' POSTHOOK: type: CREATETABLE #### A masked pattern was here #### POSTHOOK: Output: database:default @@ -26,54 +26,6 @@ POSTHOOK: type: MSCK POSTHOOK: Output: default@test Partitions not in metastore: test:pcol=100 test:pcol=200 test:pcol=300 #### A masked pattern was here #### -PREHOOK: query: select * from test -PREHOOK: type: QUERY -PREHOOK: Input: default@test -PREHOOK: Input: default@test@pcol=100 -PREHOOK: Input: default@test@pcol=200 -PREHOOK: Input: default@test@pcol=300 -#### A masked pattern was here #### -POSTHOOK: query: select * from test -POSTHOOK: type: QUERY -POSTHOOK: Input: default@test -POSTHOOK: Input: default@test@pcol=100 -POSTHOOK: Input: default@test@pcol=200 -POSTHOOK: Input: default@test@pcol=300 -#### A masked pattern was here #### -a 1 100 -b 2 100 -c 3 200 -d 4 200 -e 5 300 -f 6 300 -PREHOOK: query: explain select * from test where pcol=100 and intcol=2 -PREHOOK: type: QUERY -PREHOOK: Input: default@test -PREHOOK: Input: default@test@pcol=100 -#### A masked pattern was here #### -POSTHOOK: query: explain select * from test where pcol=100 and intcol=2 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@test -POSTHOOK: Input: default@test@pcol=100 -#### A masked pattern was here #### -STAGE DEPENDENCIES: - Stage-0 is a root stage - -STAGE PLANS: - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - TableScan - alias: test - filterExpr: ((intcol = 2) and (pcol = 100)) (type: boolean) - Filter Operator - predicate: (intcol = 2) (type: boolean) - Select Operator - expressions: strcol (type: string), 2 (type: int), 100 (type: int) - outputColumnNames: _col0, _col1, _col2 - ListSink - PREHOOK: query: select * from test where pcol=100 and intcol=2 PREHOOK: type: QUERY PREHOOK: Input: default@test @@ -85,34 +37,6 @@ POSTHOOK: Input: default@test POSTHOOK: Input: default@test@pcol=100 #### A masked pattern was here #### b 2 100 -PREHOOK: query: explain select * from test where PCOL=200 and intcol=3 -PREHOOK: type: QUERY -PREHOOK: Input: default@test -PREHOOK: Input: default@test@pcol=200 -#### A masked pattern was here #### -POSTHOOK: query: explain select * from test where PCOL=200 and intcol=3 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@test -POSTHOOK: Input: default@test@pcol=200 -#### A masked pattern was here #### -STAGE DEPENDENCIES: - Stage-0 is a root stage - -STAGE PLANS: - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - TableScan - alias: test - filterExpr: ((intcol = 3) and (pcol = 200)) (type: boolean) - Filter Operator - predicate: (intcol = 3) (type: boolean) - Select Operator - expressions: strcol (type: string), 3 (type: int), 200 (type: int) - outputColumnNames: _col0, _col1, _col2 - ListSink - PREHOOK: query: select * from test where PCOL=200 and intcol=3 PREHOOK: type: QUERY PREHOOK: Input: default@test @@ -124,34 +48,6 @@ POSTHOOK: Input: default@test POSTHOOK: Input: default@test@pcol=200 #### A masked pattern was here #### c 3 200 -PREHOOK: query: explain select * from test where `pCol`=300 and intcol=5 -PREHOOK: type: QUERY -PREHOOK: Input: default@test -PREHOOK: Input: default@test@pcol=300 -#### A masked pattern was here #### -POSTHOOK: query: explain select * from test where `pCol`=300 and intcol=5 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@test -POSTHOOK: Input: default@test@pcol=300 -#### A masked pattern was here #### -STAGE DEPENDENCIES: - Stage-0 is a root stage - -STAGE PLANS: - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - TableScan - alias: test - filterExpr: ((intcol = 5) and (pcol = 300)) (type: boolean) - Filter Operator - predicate: (intcol = 5) (type: boolean) - Select Operator - expressions: strcol (type: string), 5 (type: int), 300 (type: int) - outputColumnNames: _col0, _col1, _col2 - ListSink - PREHOOK: query: select * from test where `pCol`=300 and intcol=5 PREHOOK: type: QUERY PREHOOK: Input: default@test From 79e88ee273981435d54a3615d46005b6530163fc Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:55:18 +0100 Subject: [PATCH 14/15] Add some comments in the qtest repro --- .../clientpositive/parquet_partition_col.q | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/ql/src/test/queries/clientpositive/parquet_partition_col.q b/ql/src/test/queries/clientpositive/parquet_partition_col.q index 695daa12f99a..ae3f2bd06ff7 100644 --- a/ql/src/test/queries/clientpositive/parquet_partition_col.q +++ b/ql/src/test/queries/clientpositive/parquet_partition_col.q @@ -1,3 +1,28 @@ +-- The peculiarity of this test is that the partitioning column exists inside each individual Parquet +-- file (under data/files/parquet_partition) and at the same time it is also present in the directory +-- structure. +-- +-- The schema of the Parquet files are shown below: +-- { +-- "type" : "record", +-- "name" : "hive_schema", +-- "fields" : [ { +-- "name" : "strcol", +-- "type" : [ "null", "string" ], +-- "default" : null +-- }, { +-- "name" : "intcol", +-- "type" : [ "null", "int" ], +-- "default" : null +-- }, { +-- "name" : "pcol", +-- "type" : [ "null", "int" ], +-- "default" : null +-- } ] +-- } +-- The test case necessitates the table to be external with location already specified; we don't +-- want the data to be reloaded cause it will change the actual problem. + create external table test( strcol string, intcol integer From b8f94fbbacbdb22c9d0c84693224d6dc970ddd3d Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Mon, 21 Nov 2022 17:56:57 +0100 Subject: [PATCH 15/15] Remove unused imports from ParquetRecordReaderBase --- .../hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index a9e907b96f95..529c13871e35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -47,12 +47,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; public abstract class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class);