Skip to content

Commit

Permalink
Add support for reading parquet file thanks to arrow-dataset Kotlin#576
Browse files Browse the repository at this point in the history
  • Loading branch information
fb64 committed Aug 13, 2024
1 parent 485f3ba commit 5ce70b9
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
1 change: 1 addition & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation(libs.arrow.vector)
implementation(libs.arrow.format)
implementation(libs.arrow.memory)
implementation(libs.arrow.dataset)
implementation(libs.commonsCompress)
implementation(libs.kotlin.reflect)
implementation(libs.kotlin.datetimeJvm)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
Expand Down Expand Up @@ -184,3 +185,11 @@ public fun DataFrame.Companion.readArrow(
*/
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
DataFrame.Companion.readArrowImpl(this, nullability)

/**
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
*/
public fun DataFrame.Companion.readParquet(
url: URL,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.jetbrains.kotlinx.dataframe.io

import org.apache.arrow.dataset.file.FileFormat
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.jni.DirectReservationListener
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
Expand Down Expand Up @@ -383,3 +388,27 @@ internal fun DataFrame.Companion.readArrowImpl(
return flattened.concatKeepingSchema()
}
}

internal fun DataFrame.Companion.readArrowDataset(
fileUri: String,
fileFormat: FileFormat,
nullability: NullabilityOptions = NullabilityOptions.Infer,
): AnyFrame {
val scanOptions = ScanOptions(32768)
RootAllocator().use { allocator ->
FileSystemDatasetFactory(
allocator,
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
fileFormat,
fileUri,
).use { datasetFactory ->
datasetFactory.finish().use { dataset ->
dataset.newScan(scanOptions).use { scanner ->
scanner.scanBatches().use { reader ->
return readArrow(reader, nullability)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -627,4 +627,17 @@ internal class ArrowKtTest {
DataFrame.readArrow(dbArrowReader) shouldBe expected
}
}

@Test
fun testReadParquet() {
val path = testResource("test.arrow.parquet").path
val dataFrame = DataFrame.readParquet(URL("file:$path"))
dataFrame.rowsCount() shouldBe 300
assertEstimations(
exampleFrame = dataFrame,
expectedNullable = false,
hasNulls = false,
fromParquet = true,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import kotlin.reflect.typeOf
* Assert that we have got the same data that was originally saved on example creation.
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
*/
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
internal fun assertEstimations(
exampleFrame: AnyFrame,
expectedNullable: Boolean,
hasNulls: Boolean,
fromParquet: Boolean = false,
) {
/**
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
*/
Expand Down Expand Up @@ -136,14 +141,23 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}

val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(
rowNumber = iBatch(i),
actual = element,
expected = LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC),
)
if (fromParquet) {
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
}
} else {
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
datetimeCol.forEachIndexed { i, element ->
assertValueOrNull(
rowNumber = iBatch(i),
actual = element,
expected = LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC),
)
}
}

val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Expand Down
Binary file not shown.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ junit-platform = "1.10.2"
kotestAsserions = "5.5.4"

jsoup = "1.17.2"
arrow = "15.0.0"
arrow = "17.0.0"
docProcessor = "0.3.10"
simpleGit = "2.0.3"
dependencyVersions = "0.51.0"
Expand Down Expand Up @@ -104,6 +104,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }


Expand Down

0 comments on commit 5ce70b9

Please sign in to comment.