From 45071eba1c5b24dc5ca1ce7991ca1590bd935ac0 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 14 Apr 2021 15:14:17 +0200 Subject: [PATCH 1/2] HIVE-25216 Change-Id: I22a8ea3dfeb1340ffa53a51e76d237638e907bb8 --- .../mr/hive/HiveIcebergInputFormat.java | 53 ++- .../iceberg/mr/hive/HiveIcebergSerDe.java | 4 - .../HiveIcebergVectorizedRecordReader.java | 74 ++++ .../mr/hive/vector/HiveVectorizedReader.java | 138 +++++++ .../vector/VectorizedRowBatchIterator.java | 91 ++++ .../AbstractMapredIcebergRecordReader.java | 70 ++++ .../mr/mapred/MapredIcebergInputFormat.java | 58 +-- .../mr/mapreduce/IcebergInputFormat.java | 52 ++- ...stHiveIcebergStorageHandlerWithEngine.java | 25 +- .../positive/vectorized_iceberg_read.q | 34 ++ .../positive/vectorized_iceberg_read.q.out | 170 ++++++++ .../ql/exec/vector/VectorizedRowBatchCtx.java | 390 +++++++++--------- 12 files changed, 912 insertions(+), 247 deletions(-) create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java create mode 100644 iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read.q create mode 100644 iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read.q.out diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 9a966598e8f7..a2a614f972e9 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -23,7 +23,11 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -34,20 +38,43 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; +import org.apache.iceberg.mr.mapreduce.IcebergInputFormat; import org.apache.iceberg.mr.mapreduce.IcebergSplit; +import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.SerializationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HiveIcebergInputFormat extends MapredIcebergInputFormat - implements CombineHiveInputFormat.AvoidSplitCombination { + implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class); + private static final String HIVE_VECTORIZED_RECORDREADER_CLASS = + "org.apache.iceberg.mr.hive.vector.HiveIcebergVectorizedRecordReader"; + private static final DynConstructors.Ctor HIVE_VECTORIZED_RECORDREADER_CTOR; + + static { + if (MetastoreUtil.hive3PresentOnClasspath()) { + HIVE_VECTORIZED_RECORDREADER_CTOR = DynConstructors.builder(AbstractMapredIcebergRecordReader.class) + .impl(HIVE_VECTORIZED_RECORDREADER_CLASS, + IcebergInputFormat.class, + IcebergSplit.class, + JobConf.class, + Reporter.class) + .build(); + } else { + HIVE_VECTORIZED_RECORDREADER_CTOR = null; + } + } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -77,11 +104,33 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { public RecordReader> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { job.set(InputFormatConfig.SELECTED_COLUMNS, job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")); - return super.getRecordReader(split, job, reporter); + + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && Utilities.getIsVectorized(job)) { + Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(), "Vectorization only supported for Hive 3+"); + + job.setEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.HIVE); + job.setBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, true); + + IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit(); + // bogus cast for favouring code reuse over syntax + return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance( + new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(), + icebergSplit, + job, + reporter); + } else { + return super.getRecordReader(split, job, reporter); + } } @Override public boolean shouldSkipCombine(Path path, Configuration conf) { return true; } + + @Override + public VectorizedSupport.Support[] getSupportedFeatures() { + return new VectorizedSupport.Support[]{ VectorizedSupport.Support.DECIMAL_64 }; + } + } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 643564a12a73..2638328c3b20 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -91,10 +91,6 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr // executor, but serDeProperties are populated by HiveIcebergStorageHandler.configureInputJobProperties() and // the resulting properties are serialized and distributed to the executors - // temporarily disabling vectorization in Tez, since it doesn't work with projection pruning (fix: TEZ-4248) - // TODO: remove this once TEZ-4248 has been released and the Tez dependencies updated here - assertNotVectorizedTez(configuration); - if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { this.tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA)); if (serDeProperties.get(InputFormatConfig.PARTITION_SPEC) != null) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java new file mode 100644 index 000000000000..ddabc27932fd --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.vector; + +import java.io.IOException; +import org.apache.hadoop.hive.llap.LlapHiveUtils; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader; +import org.apache.iceberg.mr.mapreduce.IcebergSplit; + +/** + * Basically an MR1 API implementing wrapper for transferring VectorizedRowBatch's produced by + * IcebergInputformat$IcebergRecordReader which relies on the MR2 API format. + */ +public final class HiveIcebergVectorizedRecordReader extends AbstractMapredIcebergRecordReader { + + private final JobConf job; + + public HiveIcebergVectorizedRecordReader( + org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, IcebergSplit split, + JobConf job, Reporter reporter) throws IOException { + super(mapreduceInputFormat, split, job, reporter); + this.job = job; + } + + @Override + public boolean next(Void key, VectorizedRowBatch value) throws IOException { + try { + if (innerReader.nextKeyValue()) { + VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue(); + value.cols = newBatch.cols; + value.endOfFile = newBatch.endOfFile; + value.selectedInUse = newBatch.selectedInUse; + value.size = newBatch.size; + return true; + } else { + return false; + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + + @Override + public VectorizedRowBatch createValue() { + return LlapHiveUtils.findMapWork(job).getVectorizedRowBatchCtx().createVectorizedRowBatch(); + } + + @Override + public long getPos() { + return -1; + } + +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java new file mode 100644 index 000000000000..a7f157410284 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.vector; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Utility class to create vectorized readers for Hive. + * As per the file format of the task, it will create a matching vectorized record reader that is already implemented + * in Hive. It will also do some tweaks on the produced vectors for Iceberg's use e.g. partition column handling. + */ +public class HiveVectorizedReader { + + + private HiveVectorizedReader() { + + } + + public static CloseableIterable reader(InputFile inputFile, FileScanTask task, Map idToConstant, + TaskAttemptContext context) { + JobConf job = (JobConf) context.getConfiguration(); + Path path = new Path(inputFile.location()); + FileFormat format = task.file().format(); + Reporter reporter = ((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl) context).getLegacyReporter(); + + // Hive by default requires partition columns to be read too. This is not required for identity partition + // columns, as we will add this as constants later. + + int[] partitionColIndices = null; + Object[] partitionValues = null; + PartitionSpec partitionSpec = task.spec(); + + if (!partitionSpec.isUnpartitioned()) { + List readColumnIds = ColumnProjectionUtils.getReadColumnIDs(job); + + List fields = partitionSpec.fields(); + List partitionColIndicesList = Lists.newLinkedList(); + List partitionValuesList = Lists.newLinkedList(); + + for (PartitionField field : fields) { + if (field.transform().isIdentity()) { + // Skip reading identity partition columns from source file... + int hiveColIndex = field.sourceId() - 1; + readColumnIds.remove((Integer) hiveColIndex); + + // ...and use the corresponding constant value instead + partitionColIndicesList.add(hiveColIndex); + partitionValuesList.add(idToConstant.get(field.sourceId())); + } + } + + partitionColIndices = ArrayUtils.toPrimitive(partitionColIndicesList.toArray(new Integer[0])); + partitionValues = partitionValuesList.toArray(new Object[0]); + + ColumnProjectionUtils.setReadColumns(job, readColumnIds); + } + + try { + switch (format) { + case ORC: + InputSplit split = new OrcSplit(path, null, task.start(), task.length(), (String[]) null, null, + false, false, com.google.common.collect.Lists.newArrayList(), 0, task.length(), path.getParent(), null); + RecordReader recordReader = null; + + recordReader = new VectorizedOrcInputFormat().getRecordReader(split, job, reporter); + return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues); + + default: + throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format); + } + + } catch (IOException ioe) { + throw new RuntimeException("Error creating vectorized record reader for " + inputFile, ioe); + } + } + + private static CloseableIterable createVectorizedRowBatchIterable( + RecordReader hiveRecordReader, JobConf job, int[] partitionColIndices, + Object[] partitionValues) { + + VectorizedRowBatchIterator iterator = + new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues); + + return new CloseableIterable() { + + @Override + public CloseableIterator iterator() { + return iterator; + } + + @Override + public void close() throws IOException { + iterator.close(); + } + }; + } + +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java new file mode 100644 index 000000000000..6497d5d26c59 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.vector; + +import java.io.IOException; +import org.apache.hadoop.hive.llap.LlapHiveUtils; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers. + */ +public final class VectorizedRowBatchIterator implements CloseableIterator { + + private final RecordReader recordReader; + private final NullWritable key; + private final VectorizedRowBatch batch; + private final VectorizedRowBatchCtx vrbCtx; + private final int[] partitionColIndices; + private final Object[] partitionValues; + private boolean advanced = false; + + VectorizedRowBatchIterator(RecordReader recordReader, JobConf job, + int[] partitionColIndices, Object[] partitionValues) { + this.recordReader = recordReader; + this.key = recordReader.createKey(); + this.batch = recordReader.createValue(); + this.vrbCtx = LlapHiveUtils.findMapWork(job).getVectorizedRowBatchCtx(); + this.partitionColIndices = partitionColIndices; + this.partitionValues = partitionValues; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + private void advance() { + if (!advanced) { + try { + + if (!recordReader.next(key, batch)) { + batch.size = 0; + } + // Fill partition values + if (partitionColIndices != null) { + for (int i = 0; i < partitionColIndices.length; ++i) { + int colIdx = partitionColIndices[i]; + vrbCtx.addPartitionColsToBatch(batch.cols[colIdx], partitionValues[i], partitionColIndices[i]); + } + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + advanced = true; + } + } + + @Override + public boolean hasNext() { + advance(); + return batch.size > 0; + } + + @Override + public VectorizedRowBatch next() { + advance(); + advanced = false; + return batch; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java new file mode 100644 index 000000000000..92e9c4a688dc --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.mr.mapreduce.IcebergSplit; + +@SuppressWarnings("checkstyle:VisibilityModifier") +public abstract class AbstractMapredIcebergRecordReader implements RecordReader { + + protected final org.apache.hadoop.mapreduce.RecordReader innerReader; + + public AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, + IcebergSplit split, JobConf job, Reporter reporter) throws IOException { + TaskAttemptContext context = MapredIcebergInputFormat.newTaskAttemptContext(job, reporter); + + try { + innerReader = mapreduceInputFormat.createRecordReader(split, context); + innerReader.initialize(split, context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + } + + @Override + public Void createKey() { + return null; + } + + @Override + public float getProgress() throws IOException { + try { + return innerReader.getProgress(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (innerReader != null) { + innerReader.close(); + } + } + +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java index 073edd87906b..822ce52a7e46 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -78,23 +79,14 @@ public RecordReader> getRecordReader(InputSplit split, JobCon return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter); } - private static final class MapredIcebergRecordReader implements RecordReader> { - private final org.apache.hadoop.mapreduce.RecordReader innerReader; + private static final class MapredIcebergRecordReader extends AbstractMapredIcebergRecordReader> { + private final long splitLength; // for getPos() MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, IcebergSplit split, JobConf job, Reporter reporter) throws IOException { - TaskAttemptContext context = newTaskAttemptContext(job, reporter); - - try { - innerReader = mapreduceInputFormat.createRecordReader(split, context); - innerReader.initialize(split, context); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - + super(mapreduceInputFormat, split, job, reporter); splitLength = split.getLength(); } @@ -102,7 +94,7 @@ private static final class MapredIcebergRecordReader implements RecordReader< public boolean next(Void key, Container value) throws IOException { try { if (innerReader.nextKeyValue()) { - value.set(innerReader.getCurrentValue()); + value.set((T) innerReader.getCurrentValue()); return true; } } catch (InterruptedException ie) { @@ -113,11 +105,6 @@ public boolean next(Void key, Container value) throws IOException { return false; } - @Override - public Void createKey() { - return null; - } - @Override public Container createValue() { return new Container<>(); @@ -128,22 +115,6 @@ public long getPos() throws IOException { return (long) (splitLength * getProgress()); } - @Override - public float getProgress() throws IOException { - try { - return innerReader.getProgress(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - if (innerReader != null) { - innerReader.close(); - } - } } private static JobContext newJobContext(JobConf job) { @@ -153,11 +124,26 @@ private static JobContext newJobContext(JobConf job) { return new JobContextImpl(job, jobID); } - private static TaskAttemptContext newTaskAttemptContext(JobConf job, Reporter reporter) { + public static TaskAttemptContext newTaskAttemptContext(JobConf job, Reporter reporter) { TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID))) .orElseGet(TaskAttemptID::new); - return new TaskAttemptContextImpl(job, taskAttemptID, toStatusReporter(reporter)); + return new CompatibilityTaskAttemptContextImpl(job, taskAttemptID, reporter); + } + + // Saving the Reporter instance here as it is required for Hive vectorized readers. + public static class CompatibilityTaskAttemptContextImpl extends TaskAttemptContextImpl { + + private final Reporter legacyReporter; + + public CompatibilityTaskAttemptContextImpl(Configuration conf, TaskAttemptID taskId, Reporter reporter) { + super(conf, taskId, toStatusReporter(reporter)); + this.legacyReporter = reporter; + } + + public Reporter getLegacyReporter() { + return legacyReporter; + } } private static StatusReporter toStatusReporter(Reporter reporter) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 0a431891014d..c526edef5c4f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -47,6 +47,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.GenericDeleteFilter; import org.apache.iceberg.data.IdentityPartitionConverters; @@ -59,6 +60,7 @@ import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -166,6 +168,24 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemptCon } private static final class IcebergRecordReader extends RecordReader { + + private static final String HIVE_VECTORIZED_READER_CLASS = "org.apache.iceberg.mr.hive.vector.HiveVectorizedReader"; + private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER; + + static { + if (MetastoreUtil.hive3PresentOnClasspath()) { + HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader") + .impl(HIVE_VECTORIZED_READER_CLASS, + InputFile.class, + FileScanTask.class, + Map.class, + TaskAttemptContext.class) + .buildStatic(); + } else { + HIVE_VECTORIZED_READER_BUILDER = null; + } + } + private TaskAttemptContext context; private Schema tableSchema; private Schema expectedSchema; @@ -173,7 +193,7 @@ private static final class IcebergRecordReader extends RecordReader private boolean caseSensitive; private InputFormatConfig.InMemoryDataModel inMemoryDataModel; private Iterator tasks; - private T currentRow; + private T current; private CloseableIterator currentIterator; private FileIO io; private EncryptionManager encryptionManager; @@ -200,7 +220,7 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) { public boolean nextKeyValue() throws IOException { while (true) { if (currentIterator.hasNext()) { - currentRow = currentIterator.next(); + current = currentIterator.next(); return true; } else if (tasks.hasNext()) { currentIterator.close(); @@ -219,7 +239,7 @@ public Void getCurrentKey() { @Override public T getCurrentValue() { - return currentRow; + return current; } @Override @@ -267,9 +287,10 @@ private CloseableIterable openTask(FileScanTask currentTask, Schema readSchem private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { switch (inMemoryDataModel) { case PIG: - case HIVE: // TODO: Support Pig and Hive object models for IcebergInputFormat throw new UnsupportedOperationException("Pig and Hive object models are not supported."); + case HIVE: + return openTask(currentTask, readSchema); case GENERIC: DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema); Schema requiredSchema = deletes.requiredSchema(); @@ -344,24 +365,33 @@ private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask ta Map idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant); Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) - .project(readSchemaWithoutConstantAndMetadataFields) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); + + CloseableIterable orcIterator = null; // ORC does not support reuse containers yet switch (inMemoryDataModel) { case PIG: - case HIVE: // TODO: implement value readers for Pig and Hive throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive"); + case HIVE: + if (MetastoreUtil.hive3PresentOnClasspath()) { + orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context); + } else { + throw new UnsupportedOperationException("Vectorized read is unsupported for Hive 2 integration."); + } + break; case GENERIC: + ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) + .project(readSchemaWithoutConstantAndMetadataFields) + .filter(task.residual()) + .caseSensitive(caseSensitive) + .split(task.start(), task.length()); orcReadBuilder.createReaderFunc( fileSchema -> GenericOrcReader.buildReader( readSchema, fileSchema, idToConstant)); + orcIterator = orcReadBuilder.build(); } - return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema); + return applyResidualFiltering(orcIterator, task.residual(), readSchema); } private Map constantsMap(FileScanTask task, BiFunction converter) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index dd4ffbe27ef9..7c98e4687934 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; @@ -54,6 +55,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestHelper; @@ -129,7 +131,7 @@ public class TestHiveIcebergStorageHandlerWithEngine { StatsSetupConst.TOTAL_SIZE, SnapshotSummary.TOTAL_FILE_SIZE_PROP ); - @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}") + @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}, isVectorized={3}") public static Collection parameters() { Collection testParams = new ArrayList<>(); String javaVersion = System.getProperty("java.specification.version"); @@ -139,7 +141,11 @@ public static Collection parameters() { for (String engine : EXECUTION_ENGINES) { // include Tez tests only for Java 8 if (javaVersion.equals("1.8")) { - testParams.add(new Object[] {fileFormat, engine, TestTables.TestTableType.HIVE_CATALOG}); + testParams.add(new Object[] {fileFormat, engine, TestTables.TestTableType.HIVE_CATALOG, false}); + // test for vectorization=ON in case of ORC format and Tez engine + if (fileFormat == FileFormat.ORC && "tez".equals(engine) && MetastoreUtil.hive3PresentOnClasspath()) { + testParams.add(new Object[] {fileFormat, engine, TestTables.TestTableType.HIVE_CATALOG, true}); + } } } } @@ -148,7 +154,7 @@ public static Collection parameters() { // skip HiveCatalog tests as they are added before for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) { if (!TestTables.TestTableType.HIVE_CATALOG.equals(testTableType)) { - testParams.add(new Object[]{FileFormat.PARQUET, "tez", testTableType}); + testParams.add(new Object[]{FileFormat.PARQUET, "tez", testTableType, false}); } } @@ -168,6 +174,9 @@ public static Collection parameters() { @Parameter(2) public TestTables.TestTableType testTableType; + @Parameter(3) + public boolean isVectorized; + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -188,6 +197,7 @@ public static void afterClass() { public void before() throws IOException { testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, testTableType, temp); HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); } @After @@ -407,6 +417,10 @@ public void testCBOWithSelfJoin() throws IOException { public void testJoinTablesSupportedTypes() throws IOException { for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { Type type = SUPPORTED_TYPES.get(i); + if (type == Types.TimestampType.withZone() && isVectorized) { + // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive + continue; + } // TODO: remove this filter when issue #1881 is resolved if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) { continue; @@ -430,6 +444,10 @@ public void testJoinTablesSupportedTypes() throws IOException { public void testSelectDistinctFromTable() throws IOException { for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { Type type = SUPPORTED_TYPES.get(i); + if (type == Types.TimestampType.withZone() && isVectorized) { + // ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive + continue; + } // TODO: remove this filter when issue #1881 is resolved if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) { continue; @@ -978,6 +996,7 @@ public void testYearTransform() throws IOException { @Test public void testMonthTransform() throws IOException { + Assume.assumeTrue("ORC/TIMESTAMP_INSTANT is not a supported vectorized type for Hive", isVectorized); Schema schema = new Schema( optional(1, "id", Types.LongType.get()), optional(2, "part_field", Types.TimestampType.withZone())); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read.q b/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read.q new file mode 100644 index 000000000000..8daa187cb31a --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read.q @@ -0,0 +1,34 @@ +set hive.vectorized.execution.enabled=true; + +drop table if exists tbl_ice_orc; +create external table tbl_ice_orc(a int, b string) stored by iceberg stored as orc; +insert into table tbl_ice_orc values (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five'), (111, 'one'), (22, 'two'), (11, 'one'), (44444, 'four'), (44, 'four'); +analyze table tbl_ice_orc compute statistics for columns; + +explain select b, max(a) from tbl_ice_orc group by b; +select b, max(a) from tbl_ice_orc group by b; + + + +create external table tbl_ice_orc_all_types ( + t_float FLOAT, + t_double DOUBLE, + t_boolean BOOLEAN, + t_int INT, + t_bigint BIGINT, + t_binary BINARY, + t_string STRING, + t_timestamp TIMESTAMP, + t_date DATE, + t_decimal DECIMAL(4,2) + ) stored by iceberg stored as orc; + +insert into tbl_ice_orc_all_types values (1.1, 1.2, false, 4, 567890123456789, '6', "col7", cast('2012-10-03 19:58:08' as timestamp), date('1234-09-09'), cast('10.01' as decimal(4,2))); + +explain select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal; +select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal; + +drop table tbl_ice_orc; +drop table tbl_ice_orc_all_types; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read.q.out b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read.q.out new file mode 100644 index 000000000000..d22042bd646a --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read.q.out @@ -0,0 +1,170 @@ +PREHOOK: query: drop table if exists tbl_ice_orc +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists tbl_ice_orc +POSTHOOK: type: DROPTABLE +PREHOOK: query: create external table tbl_ice_orc(a int, b string) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_orc +POSTHOOK: query: create external table tbl_ice_orc(a int, b string) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_orc +PREHOOK: query: insert into table tbl_ice_orc values (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five'), (111, 'one'), (22, 'two'), (11, 'one'), (44444, 'four'), (44, 'four') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_orc +POSTHOOK: query: insert into table tbl_ice_orc values (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five'), (111, 'one'), (22, 'two'), (11, 'one'), (44444, 'four'), (44, 'four') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_orc +PREHOOK: query: analyze table tbl_ice_orc compute statistics for columns +PREHOOK: type: ANALYZE_TABLE +PREHOOK: Input: default@tbl_ice_orc +PREHOOK: Output: default@tbl_ice_orc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: analyze table tbl_ice_orc compute statistics for columns +POSTHOOK: type: ANALYZE_TABLE +POSTHOOK: Input: default@tbl_ice_orc +POSTHOOK: Output: default@tbl_ice_orc +POSTHOOK: Output: hdfs://### HDFS PATH ### +PREHOOK: query: explain select b, max(a) from tbl_ice_orc group by b +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_orc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select b, max(a) from tbl_ice_orc group by b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_orc +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_10] + Group By Operator [GBY_9] (rows=5 width=92) + Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_8] + PartitionCols:_col0 + Group By Operator [GBY_7] (rows=5 width=92) + Output:["_col0","_col1"],aggregations:["max(a)"],keys:b + TableScan [TS_0] (rows=10 width=92) + default@tbl_ice_orc,tbl_ice_orc,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + +PREHOOK: query: select b, max(a) from tbl_ice_orc group by b +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_orc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select b, max(a) from tbl_ice_orc group by b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_orc +POSTHOOK: Output: hdfs://### HDFS PATH ### +five 5 +four 44444 +one 111 +three 3 +two 22 +PREHOOK: query: create external table tbl_ice_orc_all_types ( + t_float FLOAT, + t_double DOUBLE, + t_boolean BOOLEAN, + t_int INT, + t_bigint BIGINT, + t_binary BINARY, + t_string STRING, + t_timestamp TIMESTAMP, + t_date DATE, + t_decimal DECIMAL(4,2) + ) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_orc_all_types +POSTHOOK: query: create external table tbl_ice_orc_all_types ( + t_float FLOAT, + t_double DOUBLE, + t_boolean BOOLEAN, + t_int INT, + t_bigint BIGINT, + t_binary BINARY, + t_string STRING, + t_timestamp TIMESTAMP, + t_date DATE, + t_decimal DECIMAL(4,2) + ) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_orc_all_types +PREHOOK: query: insert into tbl_ice_orc_all_types values (1.1, 1.2, false, 4, 567890123456789, '6', "col7", cast('2012-10-03 19:58:08' as timestamp), date('1234-09-09'), cast('10.01' as decimal(4,2))) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_orc_all_types +POSTHOOK: query: insert into tbl_ice_orc_all_types values (1.1, 1.2, false, 4, 567890123456789, '6', "col7", cast('2012-10-03 19:58:08' as timestamp), date('1234-09-09'), cast('10.01' as decimal(4,2))) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_orc_all_types +PREHOOK: query: explain select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_orc_all_types +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_orc_all_types +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_11] + Select Operator [SEL_10] (rows=1 width=564) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] + Group By Operator [GBY_9] (rows=1 width=564) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, KEY._col8 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_8] + PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8 + Group By Operator [GBY_7] (rows=1 width=564) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal + TableScan [TS_0] (rows=1 width=564) + default@tbl_ice_orc_all_types,tbl_ice_orc_all_types,Tbl:COMPLETE,Col:NONE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"] + +PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_orc_all_types +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types + group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_orc_all_types +POSTHOOK: Output: hdfs://### HDFS PATH ### +1.1 1.2 false 4 567890123456789 6 col7 2012-10-03 19:58:08 1234-09-02 10.01 +PREHOOK: query: drop table tbl_ice_orc +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tbl_ice_orc +PREHOOK: Output: default@tbl_ice_orc +POSTHOOK: query: drop table tbl_ice_orc +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tbl_ice_orc +POSTHOOK: Output: default@tbl_ice_orc +PREHOOK: query: drop table tbl_ice_orc_all_types +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tbl_ice_orc_all_types +PREHOOK: Output: default@tbl_ice_orc_all_types +POSTHOOK: query: drop table tbl_ice_orc_all_types +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tbl_ice_orc_all_types +POSTHOOK: Output: default@tbl_ice_orc_all_types diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index d8dcbe5755d5..45b1cefd05cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -395,203 +396,210 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition public void addPartitionColsToBatch(ColumnVector[] cols, Object[] partitionValues) { - if (partitionValues != null) { - for (int i = 0; i < partitionColumnCount; i++) { - Object value = partitionValues[i]; - - int colIndex = dataColumnCount + i; - String partitionColumnName = rowColumnNames[colIndex]; - PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; - switch (primitiveTypeInfo.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill((Boolean) value == true ? 1 : 0); - } - } - break; - - case BYTE: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill((Byte) value); - } - } - break; - - case SHORT: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill((Short) value); - } - } - break; - - case INT: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill((Integer) value); - } - } - break; - - case LONG: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill((Long) value); - } - } - break; - - case DATE: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill(DateWritableV2.dateToDays((Date) value)); - } - } - break; - - case TIMESTAMP: { - TimestampColumnVector lcv = (TimestampColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill(((Timestamp) value).toSqlTimestamp()); - } - } - break; - - case INTERVAL_YEAR_MONTH: { - LongColumnVector lcv = (LongColumnVector) cols[colIndex]; - if (value == null) { - lcv.noNulls = false; - lcv.isNull[0] = true; - lcv.isRepeating = true; - } else { - lcv.fill(((HiveIntervalYearMonth) value).getTotalMonths()); - } - } + int[] colIndices = IntStream.range(dataColumnCount, dataColumnCount + partitionColumnCount).toArray(); - case INTERVAL_DAY_TIME: { - IntervalDayTimeColumnVector icv = (IntervalDayTimeColumnVector) cols[colIndex]; - if (value == null) { - icv.noNulls = false; - icv.isNull[0] = true; - icv.isRepeating = true; - } else { - icv.fill(((HiveIntervalDayTime) value)); - } - } + if (partitionValues != null) { + for (int i = 0; i < colIndices.length; i++) { + int colIndex = colIndices[i]; + addPartitionColsToBatch(cols[colIndex], partitionValues[i], colIndex); + } + } + } - case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex]; - if (value == null) { - dcv.noNulls = false; - dcv.isNull[0] = true; - dcv.isRepeating = true; - } else { - dcv.fill((Float) value); - } - } - break; - - case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex]; - if (value == null) { - dcv.noNulls = false; - dcv.isNull[0] = true; - dcv.isRepeating = true; - } else { - dcv.fill((Double) value); - } - } - break; - - case DECIMAL: { - DataTypePhysicalVariation dataTypePhysicalVariation = rowDataTypePhysicalVariations != null ? - rowDataTypePhysicalVariations[colIndex] : DataTypePhysicalVariation.NONE; - - if (dataTypePhysicalVariation == DataTypePhysicalVariation.DECIMAL_64) { - Decimal64ColumnVector dv = (Decimal64ColumnVector) cols[colIndex]; - if (value == null) { - dv.noNulls = false; - dv.isNull[0] = true; - dv.isRepeating = true; - } else { - dv.fill(((HiveDecimal) value).longValue()); - } - } else { - DecimalColumnVector dv = (DecimalColumnVector) cols[colIndex]; - if (value == null) { - dv.noNulls = false; - dv.isNull[0] = true; - dv.isRepeating = true; - } else { - dv.fill((HiveDecimal) value); - } - } + public void addPartitionColsToBatch(ColumnVector col, Object value, int colIndex) + { + String partitionColumnName = rowColumnNames[colIndex]; + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill((Boolean) value == true ? 1 : 0); + } + } + break; + + case BYTE: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill((Byte) value); + } + } + break; + + case SHORT: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill((Short) value); + } + } + break; + + case INT: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill((Integer) value); + } + } + break; + + case LONG: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill((Long) value); + } + } + break; + + case DATE: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill(DateWritableV2.dateToDays((Date) value)); + } + } + break; + + case TIMESTAMP: { + TimestampColumnVector lcv = (TimestampColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill(((Timestamp) value).toSqlTimestamp()); + } + } + break; + + case INTERVAL_YEAR_MONTH: { + LongColumnVector lcv = (LongColumnVector) col; + if (value == null) { + lcv.noNulls = false; + lcv.isNull[0] = true; + lcv.isRepeating = true; + } else { + lcv.fill(((HiveIntervalYearMonth) value).getTotalMonths()); + } + } + break; + + case INTERVAL_DAY_TIME: { + IntervalDayTimeColumnVector icv = (IntervalDayTimeColumnVector) col; + if (value == null) { + icv.noNulls = false; + icv.isNull[0] = true; + icv.isRepeating = true; + } else { + icv.fill(((HiveIntervalDayTime) value)); + } + } + break; + + case FLOAT: { + DoubleColumnVector dcv = (DoubleColumnVector) col; + if (value == null) { + dcv.noNulls = false; + dcv.isNull[0] = true; + dcv.isRepeating = true; + } else { + dcv.fill((Float) value); + } + } + break; + + case DOUBLE: { + DoubleColumnVector dcv = (DoubleColumnVector) col; + if (value == null) { + dcv.noNulls = false; + dcv.isNull[0] = true; + dcv.isRepeating = true; + } else { + dcv.fill((Double) value); + } + } + break; + + case DECIMAL: { + DataTypePhysicalVariation dataTypePhysicalVariation = rowDataTypePhysicalVariations != null ? + rowDataTypePhysicalVariations[colIndex] : DataTypePhysicalVariation.NONE; + + if (dataTypePhysicalVariation == DataTypePhysicalVariation.DECIMAL_64) { + Decimal64ColumnVector dv = (Decimal64ColumnVector) col; + if (value == null) { + dv.noNulls = false; + dv.isNull[0] = true; + dv.isRepeating = true; + } else { + dv.fill(((HiveDecimal) value).longValue()); } - break; - - case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) cols[colIndex]; - byte[] bytes = (byte[]) value; - if (bytes == null) { - bcv.noNulls = false; - bcv.isNull[0] = true; - bcv.isRepeating = true; - } else { - bcv.fill(bytes); - } - } - break; - - case STRING: - case CHAR: - case VARCHAR: { - BytesColumnVector bcv = (BytesColumnVector) cols[colIndex]; - String sVal = value.toString(); - if (sVal == null) { - bcv.noNulls = false; - bcv.isNull[0] = true; - bcv.isRepeating = true; - } else { - bcv.fill(sVal.getBytes()); - } + } else { + DecimalColumnVector dv = (DecimalColumnVector) col; + if (value == null) { + dv.noNulls = false; + dv.isNull[0] = true; + dv.isRepeating = true; + } else { + dv.fill((HiveDecimal) value); } - break; - - default: - throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() + - " for column " + partitionColumnName); + } + } + break; + + case BINARY: { + BytesColumnVector bcv = (BytesColumnVector) col; + byte[] bytes = (byte[]) value; + if (bytes == null) { + bcv.noNulls = false; + bcv.isNull[0] = true; + bcv.isRepeating = true; + } else { + bcv.fill(bytes); } } + break; + + case STRING: + case CHAR: + case VARCHAR: { + BytesColumnVector bcv = (BytesColumnVector) col; + String sVal = value.toString(); + if (sVal == null) { + bcv.noNulls = false; + bcv.isNull[0] = true; + bcv.isRepeating = true; + } else { + bcv.fill(sVal.getBytes()); + } + } + break; + + default: + throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() + + " for column " + partitionColumnName); } } From 2a3588518253ab604a425c49b2b289a71a060a5a Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Tue, 22 Jun 2021 11:58:46 +0200 Subject: [PATCH 2/2] Removing negative test asserting vectorization Change-Id: I33841a1c9604159db912da8672c8d79e83fdbd22 --- .../src/test/queries/negative/create_iceberg_table_failure.q | 2 -- .../test/results/negative/create_iceberg_table_failure.q.out | 5 ----- 2 files changed, 7 deletions(-) delete mode 100644 iceberg/iceberg-handler/src/test/queries/negative/create_iceberg_table_failure.q delete mode 100644 iceberg/iceberg-handler/src/test/results/negative/create_iceberg_table_failure.q.out diff --git a/iceberg/iceberg-handler/src/test/queries/negative/create_iceberg_table_failure.q b/iceberg/iceberg-handler/src/test/queries/negative/create_iceberg_table_failure.q deleted file mode 100644 index 8d8bbecf09b4..000000000000 --- a/iceberg/iceberg-handler/src/test/queries/negative/create_iceberg_table_failure.q +++ /dev/null @@ -1,2 +0,0 @@ -set hive.vectorized.execution.enabled=true; -CREATE EXTERNAL TABLE ice_t (i int, s string, ts timestamp, d date) STORED BY ICEBERG; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/negative/create_iceberg_table_failure.q.out b/iceberg/iceberg-handler/src/test/results/negative/create_iceberg_table_failure.q.out deleted file mode 100644 index 8267e64d9e37..000000000000 --- a/iceberg/iceberg-handler/src/test/results/negative/create_iceberg_table_failure.q.out +++ /dev/null @@ -1,5 +0,0 @@ -PREHOOK: query: CREATE EXTERNAL TABLE ice_t (i int, s string, ts timestamp, d date) STORED BY ICEBERG -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@ice_t -FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.UnsupportedOperationException: Vectorized execution on Tez is currently not supported when using Iceberg tables. Please set hive.vectorized.execution.enabled=false and rerun the query.