Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** A pointer to the native implementation of ColumnReader. */
protected long nativeHandle;

public AbstractColumnReader(
AbstractColumnReader(
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
Expand All @@ -76,7 +76,7 @@ public AbstractColumnReader(
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
}

public AbstractColumnReader(
AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
Expand All @@ -85,7 +85,7 @@ public AbstractColumnReader(
TypeUtil.checkParquetType(descriptor, type);
}

public ColumnDescriptor getDescriptor() {
ColumnDescriptor getDescriptor() {
return descriptor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public BatchReader(
}

/**
* @deprecated since 0.9.1, will be removed in 0.10.0.
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public BatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
Expand Down Expand Up @@ -383,14 +384,16 @@ public void init() throws URISyntaxException, IOException {
}

/**
* @deprecated since 0.9.1, will be removed in 0.10.0.
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public void setSparkSchema(StructType schema) {
this.sparkSchema = schema;
}

/**
* @deprecated since 0.9.1, will be removed in 0.10.0.
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public AbstractColumnReader[] getColumnReaders() {
return columnReaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class ColumnReader extends AbstractColumnReader {
private ArrowArray array = null;
private ArrowSchema schema = null;

public ColumnReader(
ColumnReader(
DataType type,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
Expand All @@ -111,6 +111,8 @@ public ColumnReader(
* Set the page reader for a new column chunk to read. Expects to call `readBatch` after this.
*
* @param pageReader the page reader for the new column chunk
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public void setPageReader(PageReader pageReader) throws IOException {
this.pageReader = pageReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,37 @@ public class ConstantColumnReader extends MetadataColumnReader {
/** The constant value in the format of Object that are used to initialize this column reader. */
private Object value;

public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
this.value =
ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[
0];
init(value);
}

public ConstantColumnReader(
ConstantColumnReader(
StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) {
this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
init(values, index);
}

/**
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
this.value = value;
}

// Used by Iceberg
public ConstantColumnReader(
DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) {
super(type, spec, useDecimal128, true);
this.value = value;
}

ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
Expand Down
19 changes: 9 additions & 10 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
Expand Down Expand Up @@ -130,15 +129,14 @@ public class FileReader implements Closeable {
private RowGroupReader currentRowGroup = null;
private InternalFileDecryptor fileDecryptor;

public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions)
FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions)
throws IOException {
this(file, null, options, cometOptions, null);
}

/** This constructor is called from Apache Iceberg. */
public FileReader(
Path path,
Configuration conf,
WrappedInputFile file,
ReadOptions cometOptions,
Map<String, String> properties,
Long start,
Expand All @@ -147,9 +145,10 @@ public FileReader(
byte[] fileAADPrefix)
throws IOException {
ParquetReadOptions options =
buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix);
buildParquetReadOptions(
new Configuration(), properties, start, length, fileEncryptionKey, fileAADPrefix);
this.converter = new ParquetMetadataConverter(options);
this.file = CometInputFile.fromPath(path, conf);
this.file = file;
this.f = file.newStream();
this.options = options;
this.cometOptions = cometOptions;
Expand Down Expand Up @@ -177,7 +176,7 @@ public FileReader(
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

public FileReader(
FileReader(
InputFile file,
ParquetReadOptions options,
ReadOptions cometOptions,
Expand All @@ -186,7 +185,7 @@ public FileReader(
this(file, null, options, cometOptions, metrics);
}

public FileReader(
FileReader(
InputFile file,
ParquetMetadata footer,
ParquetReadOptions options,
Expand Down Expand Up @@ -226,12 +225,12 @@ public FileReader(
}

/** Returns the footer of the Parquet file being read. */
public ParquetMetadata getFooter() {
ParquetMetadata getFooter() {
return this.footer;
}

/** Returns the metadata of the Parquet file being read. */
public FileMetaData getFileMetaData() {
FileMetaData getFileMetaData() {
return this.fileMetaData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class LazyColumnReader extends ColumnReader {
// The lazy vector being updated.
private final CometLazyVector vector;

public LazyColumnReader(
LazyColumnReader(
DataType sparkReadType,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class MetadataColumnReader extends AbstractColumnReader {

private boolean isConstant;

/**
* @deprecated since 0.10.0, will be made package private in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
Expand All @@ -50,6 +54,15 @@ public MetadataColumnReader(
this.isConstant = isConstant;
}

// Used by Iceberg
public MetadataColumnReader(
DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false);

this.isConstant = isConstant;
}

@Override
public void setBatchSize(int batchSize) {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public NativeBatchReader(
this.taskContext = TaskContext$.MODULE$.get();
}

public NativeBatchReader(AbstractColumnReader[] columnReaders) {
private NativeBatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
int numColumns = columnReaders.length;
this.columnReaders = new AbstractColumnReader[numColumns];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class NativeColumnReader extends AbstractColumnReader {
private long nativeBatchHandle = 0xDEADBEEFL;
private final int columnNum;

public NativeColumnReader(
NativeColumnReader(
long nativeBatchHandle,
int columnNum,
DataType type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@

import java.util.Map;

/**
* Parquet ColumnSpec encapsulates the information withing a Parquet ColumnDescriptor. Utility
* methods can convert from and to a ColumnDescriptor The only purpose of this class is to allow
* passing of Column descriptors between Comet and Iceberg. This is required because Iceberg shades
* Parquet, changing the package of Parquet classes and making then incompatible with Comet.
*/
public class ParquetColumnSpec {

private final int fieldId;
Expand Down
13 changes: 12 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@

import org.apache.comet.CometConf;

import static org.apache.comet.parquet.Utils.descriptorToParquetColumnSpec;

public class TypeUtil {

/** Converts the input Spark 'field' into a Parquet column descriptor. */
/**
* Converts the input Spark 'field' into a Parquet column descriptor.
*
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public static ColumnDescriptor convertToParquet(StructField field) {
Type.Repetition repetition;
int maxDefinitionLevel;
Expand Down Expand Up @@ -105,6 +112,10 @@ public static ColumnDescriptor convertToParquet(StructField field) {
return new ColumnDescriptor(path, builder.named(field.name()), 0, maxDefinitionLevel);
}

public static ParquetColumnSpec convertToParquetSpec(StructField field) {
return descriptorToParquetColumnSpec(convertToParquet(field));
}

/**
* Check whether the Parquet 'descriptor' and Spark read type 'sparkType' are compatible. If not,
* throw exception.
Expand Down
68 changes: 67 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.comet.parquet;

import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -54,8 +57,9 @@ public static ColumnReader getColumnReader(
/**
* This method is called from Apache Iceberg.
*
* @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec
* @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader with ParquetColumnSpec
* instead.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
public static ColumnReader getColumnReader(
DataType type,
Expand Down Expand Up @@ -453,4 +457,66 @@ private static LogicalTypeAnnotation reconstructLogicalType(
throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
}
}

public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {

String[] path = descriptor.getPath();
PrimitiveType primitiveType = descriptor.getPrimitiveType();
String physicalType = primitiveType.getPrimitiveTypeName().name();

int typeLength =
primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
? primitiveType.getTypeLength()
: 0;

boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED;

String logicalTypeName = null;
Map<String, String> logicalTypeParams = new HashMap<>();
LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation();

if (logicalType != null) {
logicalTypeName = logicalType.getClass().getSimpleName();

// Handle specific logical types
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision()));
logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
} else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC()));
logicalTypeParams.put("unit", timestamp.getUnit().name());
} else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
(LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC()));
logicalTypeParams.put("unit", time.getUnit().name());
} else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth()));
}
}

int id = -1;
Type type = descriptor.getPrimitiveType();
if (type != null && type.getId() != null) {
id = type.getId().intValue();
}

return new ParquetColumnSpec(
id,
path,
physicalType,
typeLength,
isRepeated,
descriptor.getMaxDefinitionLevel(),
descriptor.getMaxRepetitionLevel(),
logicalTypeName,
logicalTypeParams);
}
}
Loading
Loading