Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
9535117
Stash changes. Maybe runs TPC-H q1?
mbutrovich Nov 8, 2024
22b648d
filters?
mbutrovich Nov 8, 2024
8e47562
Enable filter pushdown with TableParquetOptions.
mbutrovich Nov 9, 2024
196311e
Clippy.
mbutrovich Nov 9, 2024
fb68558
Fix Q1.
mbutrovich Nov 9, 2024
0b0d6e8
add partial support for multiple parquet files
andygrove Nov 9, 2024
2027755
Merge branch 'native_parquet2' into df-parquet-exec
mbutrovich Nov 9, 2024
ef9f8f5
Merge pull request #1 from andygrove/df-parquet-exec
mbutrovich Nov 9, 2024
95d69fa
Clippy
mbutrovich Nov 9, 2024
0830840
partitioning
andygrove Nov 9, 2024
d7396e4
merge
andygrove Nov 9, 2024
4e525fc
fix
andygrove Nov 9, 2024
ef54934
fix
andygrove Nov 9, 2024
e52fe77
Merge pull request #2 from andygrove/df-parquet-exec
mbutrovich Nov 9, 2024
16033d9
upmerge
andygrove Nov 11, 2024
ad46821
Merge remote-tracking branch 'apache/main' into comet-parquet-exec
andygrove Nov 11, 2024
38e32f7
wip - CometNativeScan (#1076)
parthchandra Nov 12, 2024
311bc9e
Revert "wip - CometNativeScan (#1076)"
andygrove Nov 12, 2024
bd68db8
wip - CometNativeScan (#1078)
parthchandra Nov 13, 2024
33d2b23
[comet-parquet-exec] Fix compilation errors in Rust tests, remove som…
andygrove Nov 13, 2024
eafda43
[comet-parquet-exec] Pass Spark's partitions to DF's ParquetExec (#1081)
mbutrovich Nov 13, 2024
786250a
update some stability plans (#1083)
andygrove Nov 14, 2024
8a0df9d
[comet-parquet-exec] Handle CometNativeScan RDD when DataSourceRDD in…
mbutrovich Nov 15, 2024
1cca8d6
feat: Hook DataFusion Parquet native scan with Comet execution (#1094)
viirya Nov 19, 2024
c3ad26e
fix: Support partition values in feature branch comet-parquet-exec (#…
viirya Nov 22, 2024
4de51a8
fix: Use filePath instead of pathUri (#1124)
viirya Nov 29, 2024
29b2b77
fix: [comet-parquet-exec] Use RDD partition index (#1120)
viirya Dec 2, 2024
ab09337
[comet-parquet-exec] Comet parquet exec 2 (copy of Parth's PR) (#1138)
andygrove Dec 4, 2024
e3672f7
[comet-parquet-exec] Add unit test for reading a struct field from Pa…
andygrove Dec 4, 2024
e0d8077
[comet-parquet-exec] Simplify schema logic for CometNativeScan (#1142)
mbutrovich Dec 5, 2024
bf5a2c6
clippy (#1140)
parthchandra Dec 5, 2024
bd797f5
feat: [comet-parquet-exec] Schema adapter fixes (#1139)
andygrove Dec 6, 2024
5401de0
[comet-parquet-exec] Change path handling to fix URL decoding (#1149)
mbutrovich Dec 9, 2024
3131a1d
Add CometNativeScanExec support to CheckParquetScan. (#1160)
mbutrovich Dec 10, 2024
b63570b
fix: use inputRDD to get outputPartitions in CometScanExec (#1162)
parthchandra Dec 11, 2024
06cdd22
Revert "fix: use inputRDD to get outputPartitions in CometScanExec (#…
andygrove Dec 12, 2024
8563edf
fix: [comet-parquet-exec] fix regressions original comet native scal …
parthchandra Dec 13, 2024
2686a4b
feat: [comet-parquet-exec] Use Datafusion based record batch reader f…
parthchandra Dec 17, 2024
3c43234
[comet-parquet-exec] Merge upstream/main and resolve conflicts (#1183)
mbutrovich Dec 20, 2024
c6f4985
fix: [comet-parquet-exec] Fix timestamp cast errors (#1191)
andygrove Dec 20, 2024
3d70b2e
fix: Simplify native scan config (#1225)
parthchandra Jan 6, 2025
7488af8
[comet-parquet-exec] fix: fix various bugs in casting between struct …
andygrove Jan 7, 2025
78e2820
[comet-parquet-exec] Disable DPP in stability tests when full native …
andygrove Jan 7, 2025
6ab514f
[comet-parquet-exec] Fix regressions in DisableAQECometShuffleSuite (…
andygrove Jan 8, 2025
01b5917
fix: Set scan implementation choice via environment variable (#1231)
parthchandra Jan 8, 2025
27b4e81
[comet-parquet-exec] Move type conversion logic for ParquetExec out o…
mbutrovich Jan 9, 2025
b49c17b
chore: [comet-parquet-exec] Unit test fixes, default scan impl to nat…
parthchandra Jan 13, 2025
6df59bd
chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) (#1272)
andygrove Jan 13, 2025
274566f
test: temporarily disable plan stability tests (#1274)
parthchandra Jan 13, 2025
6cafe28
fix: handle loading of complex types into CometVector correctly in ic…
parthchandra Jan 14, 2025
017963a
Merge branch 'main' into comet-parquet-exec
parthchandra Jan 14, 2025
285396c
Fix build after merge
parthchandra Jan 14, 2025
b3703f5
Fix tests after merge
parthchandra Jan 15, 2025
2c83bdd
Fix plans after merge
parthchandra Jan 15, 2025
79717b8
fix partition id in execute plan after merge (from Andy Grove)
parthchandra Jan 15, 2025
c137a74
fix regression
andygrove Jan 16, 2025
069099e
fix
andygrove Jan 16, 2025
5d0c693
fix merge issue around spark-expr refactoring
andygrove Jan 16, 2025
e59f25e
Revert config change
andygrove Jan 16, 2025
1b45a02
update configs.md
andygrove Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.TimestampNTZType$;

Expand All @@ -36,6 +37,9 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** The Spark data type. */
protected final DataType type;

/** The Spark data type. */
protected final Type fieldType;

/** Parquet column descriptor. */
protected final ColumnDescriptor descriptor;

Expand All @@ -61,13 +65,23 @@ public abstract class AbstractColumnReader implements AutoCloseable {

public AbstractColumnReader(
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this.type = type;
this.fieldType = fieldType;
this.descriptor = descriptor;
this.useDecimal128 = useDecimal128;
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
}

public AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
TypeUtil.checkParquetType(descriptor, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void init() throws URISyntaxException, IOException {
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
if (requestedSchema.getColumns().size() != sparkSchema.size()) {
if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has %d columns",
Expand Down
52 changes: 52 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,56 @@ public static native void setPageV2(
* @param handle the handle to the native Parquet column reader
*/
public static native void closeColumnReader(long handle);

///////////// Arrow Native Parquet Reader APIs
// TODO: Add partitionValues(?), improve requiredColumns to use a projection mask that corresponds
// to arrow.
// Add batch size, datetimeRebaseModeSpec, metrics(how?)...

/**
* Initialize a record batch reader for a PartitionedFile
*
* @param filePath
* @param start
* @param length
* @return a handle to the record batch reader, used in subsequent calls.
*/
public static native long initRecordBatchReader(
String filePath,
long fileSize,
long start,
long length,
byte[] requiredSchema,
String sessionTimezone);

// arrow native version of read batch
/**
* Read the next batch of data into memory on native side
*
* @param handle
* @return the number of rows read
*/
public static native int readNextRecordBatch(long handle);

// arrow native equivalent of currentBatch. 'columnNum' is number of the column in the record
// batch
/**
* Load the column corresponding to columnNum in the currently loaded record batch into JVM
*
* @param handle
* @param columnNum
* @param arrayAddr
* @param schemaAddr
*/
public static native void currentColumnBatch(
long handle, int columnNum, long arrayAddr, long schemaAddr);

// arrow native version to close record batch reader

/**
* Close the record batch reader. Free the resources
*
* @param handle
*/
public static native void closeRecordBatchReader(long handle);
}
Loading
Loading