Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet;

import org.apache.spark.sql.execution.vectorized.Dictionary;

public final class ParquetDictionary implements Dictionary {
private org.apache.parquet.column.Dictionary dictionary;

public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) {
this.dictionary = dictionary;
}

@Override
public int decodeToInt(int id) {
return dictionary.decodeToInt(id);
}

@Override
public long decodeToLong(int id) {
return dictionary.decodeToLong(id);
}

@Override
public float decodeToFloat(int id) {
return dictionary.decodeToFloat(id);
}

@Override
public double decodeToDouble(int id) {
return dictionary.decodeToDouble(id);
}

@Override
public byte[] decodeToBinary(int id) {
return dictionary.decodeToBinary(id).getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void readBatch(int total, ColumnVector column) throws IOException {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
column.setDictionary(dictionary);
column.setDictionary(new ParquetDictionary(dictionary));
} else {
decodeDictionaryIds(rowId, num, column, dictionaryIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ public float getProgress() throws IOException, InterruptedException {
return (float) rowsReturned / totalRowCount;
}

/**
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/

// Creates a columnar batch that includes the schema from the data files and the additional
// partition columns appended to the end of the batch.
// For example, if the data contains two columns, with 2 partition columns:
Expand Down Expand Up @@ -204,12 +198,17 @@ public void initBatch(StructType partitionColumns, InternalRow partitionValues)
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
}

/**
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/
public ColumnarBatch resultBatch() {
if (columnarBatch == null) initBatch();
return columnarBatch;
}

/*
/**
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
Expand Down Expand Up @@ -237,9 +236,7 @@ public boolean nextBatch() throws IOException {
}

private void initializeInternal() throws IOException, UnsupportedOperationException {
/**
* Check that the requested schema is supported.
*/
// Check that the requested schema is supported.
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.math.BigInteger;

import com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -313,8 +311,8 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
}

/**
* Ensures that there is enough storage to store capcity elements. That is, the put() APIs
* must work for all rowIds < capcity.
* Ensures that there is enough storage to store capacity elements. That is, the put() APIs
* must work for all rowIds < capacity.
*/
protected abstract void reserveInternal(int capacity);

Expand Down Expand Up @@ -479,7 +477,6 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* src should contain `count` doubles written as ieee format.
*/
public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);

Expand All @@ -506,7 +503,6 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* src should contain `count` doubles written as ieee format.
*/
public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);

Expand Down Expand Up @@ -628,8 +624,8 @@ public final UTF8String getUTF8String(int rowId) {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(v.getBytes());
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(bytes);
}
}

Expand All @@ -643,8 +639,7 @@ public final byte[] getBinary(int rowId) {
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return v.getBytes();
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;

/**
* The interface for dictionary in ColumnVector to decode dictionary encoded values.
*/
public interface Dictionary {

int decodeToInt(int id);

long decodeToLong(int id);

float decodeToFloat(int id);

double decodeToDouble(int id);

byte[] decodeToBinary(int id);
}