Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.TimestampNTZType$;

Expand All @@ -36,6 +37,9 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** The Spark data type. */
protected final DataType type;

/** The Spark data type. */
protected final Type fieldType;

/** Parquet column descriptor. */
protected final ColumnDescriptor descriptor;

Expand All @@ -61,13 +65,23 @@ public abstract class AbstractColumnReader implements AutoCloseable {

public AbstractColumnReader(
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this.type = type;
this.fieldType = fieldType;
this.descriptor = descriptor;
this.useDecimal128 = useDecimal128;
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
}

public AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
TypeUtil.checkParquetType(descriptor, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
import org.apache.comet.vector.NativeUtil;

/**
* A vectorized Parquet reader that reads a Parquet file in a batched fashion.
Expand All @@ -94,6 +95,7 @@
public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(NativeBatchReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
private NativeUtil nativeUtil = new NativeUtil();

private Configuration conf;
private int capacity;
Expand Down Expand Up @@ -266,7 +268,8 @@ public void init() throws URISyntaxException, IOException {

//// Create Column readers
List<ColumnDescriptor> columns = requestedSchema.getColumns();
int numColumns = columns.size();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This yields leaf fields which is not what we want.

List<Type> fields = requestedSchema.getFields();
int numColumns = fields.size();
if (partitionSchema != null) numColumns += partitionSchema.size();
columnReaders = new AbstractColumnReader[numColumns];

Expand Down Expand Up @@ -454,6 +457,7 @@ public void close() throws IOException {
importer.close();
importer = null;
}
nativeUtil.close();
Native.closeRecordBatchReader(this.handle);
}

Expand All @@ -469,19 +473,23 @@ private int loadNextBatch() throws Throwable {
importer = new CometSchemaImporter(ALLOCATOR);

List<ColumnDescriptor> columns = requestedSchema.getColumns();
for (int i = 0; i < columns.size(); i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also operates on leaf fields which, again, is not what we want.

List<Type> fields = requestedSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
// TODO: (ARROW NATIVE) check this. Currently not handling missing columns correctly?
if (missingColumns[i]) continue;
if (columnReaders[i] != null) columnReaders[i].close();
// TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
DataType dataType = sparkSchema.fields()[i].dataType();
Type field = fields.get(i);
NativeColumnReader reader =
new NativeColumnReader(
this.handle,
i,
dataType,
columns.get(i),
field,
null,
importer,
nativeUtil,
capacity,
useDecimal128,
useLegacyDateTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@
import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.*;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.vector.*;
Expand Down Expand Up @@ -65,6 +61,7 @@ public class NativeColumnReader extends AbstractColumnReader {
boolean hadNull;

private final CometSchemaImporter importer;
private final NativeUtil nativeUtil;

private ArrowArray array = null;
private ArrowSchema schema = null;
Expand All @@ -76,14 +73,17 @@ public NativeColumnReader(
long nativeBatchHandle,
int columnNum,
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
NativeUtil nativeUtil,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
super(type, fieldType, descriptor, useDecimal128, useLegacyDateTimestamp);
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
this.batchSize = batchSize;
this.nativeUtil = nativeUtil;
this.importer = importer;
this.nativeBatchHandle = nativeBatchHandle;
this.columnNum = columnNum;
Expand All @@ -94,13 +94,13 @@ public NativeColumnReader(
// Override in order to avoid creation of JVM side column readers
protected void initNative() {
LOG.debug(
"Native column reader " + String.join(".", this.descriptor.getPath()) + " is initialized");
"Native column reader {} is initialized", String.join(".", this.type.catalogString()));
nativeHandle = 0;
}

@Override
public void readBatch(int total) {
LOG.debug("Reading column batch of size = " + total);
LOG.debug("Reading column batch of size = {}", total);

this.currentNumValues = total;
}
Expand Down Expand Up @@ -131,10 +131,7 @@ public CometDecodedVector loadVector() {
currentVector.close();
}

LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean isUuid =
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
// TODO: ARROW NATIVE : Handle Uuid?

array = ArrowArray.allocateNew(ALLOCATOR);
schema = ArrowSchema.allocateNew(ALLOCATOR);
Expand All @@ -144,47 +141,19 @@ public CometDecodedVector loadVector() {

Native.currentColumnBatch(nativeBatchHandle, columnNum, arrayAddr, schemaAddr);

FieldVector vector = importer.importVector(array, schema);
ArrowArray[] arrays = {array};
ArrowSchema[] schemas = {schema};

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
CometDecodedVector cometVector =
(CometDecodedVector)
scala.collection.JavaConverters.seqAsJavaList(nativeUtil.importVector(arrays, schemas))
.get(0);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();

if (dictionaryEncoding == null) {
if (dictionary != null) {
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is valid only for primitive types

if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
currentVector = cometVector;
return currentVector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public DictionaryProvider getDictionaryProvider() {
* @param useDecimal128 Whether to use Decimal128 for decimal column
* @return `CometVector` implementation
*/
protected static CometVector getVector(
public static CometVector getVector(
ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) {
if (vector instanceof StructVector) {
return new CometStructVector(vector, useDecimal128, dictionaryProvider);
Expand Down
Loading