Skip to content

Commit

Permalink
Improve and test dataframe API examples in docs (#11290)
Browse files Browse the repository at this point in the history
* Improve and test dataframe API examples in docs

* Update introduction with pointer to user guide

* Make example consistent

* Make read_csv comment consistent

* clarifications

* prettier + tweaks

* Update docs/source/library-user-guide/using-the-dataframe-api.md

Co-authored-by: Eric Fredine <[email protected]>

* Update docs/source/library-user-guide/using-the-dataframe-api.md

Co-authored-by: Eric Fredine <[email protected]>

---------

Co-authored-by: Eric Fredine <[email protected]>
  • Loading branch information
alamb and efredine committed Jul 9, 2024
1 parent c018c74 commit 1e0c06e
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 89 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ cargo run --example dataframe
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,5 +641,11 @@ doc_comment::doctest!(
#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-sql-api.md",
library_user_guide_example_usage
library_user_guide_sql_api
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
library_user_guide_dataframe_api
);
302 changes: 220 additions & 82 deletions docs/source/library-user-guide/using-the-dataframe-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,129 +19,267 @@

# Using the DataFrame API

## What is a DataFrame
The [Users Guide] introduces the [`DataFrame`] API and this section describes
that API in more depth.

`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans.
## What is a DataFrame?

```rust
pub struct DataFrame {
session_state: SessionState,
plan: LogicalPlan,
}
```

You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`:

```rust
let df = ctx.table("users").await?;
As described in the [Users Guide], DataFusion [`DataFrame`]s are modeled after
the [Pandas DataFrame] interface, and are implemented as thin wrapper over a
[`LogicalPlan`] that adds functionality for building and executing those plans.

// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?;

// Build the same plan using the LogicalPlanBuilder
let plan = LogicalPlanBuilder::from(&df.to_logical_plan())
.project(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?
.build()?;
```

You can use `collect` or `execute_stream` to execute the query.
The simplest possible dataframe is one that scans a table and that table can be
in a file or in memory.

## How to generate a DataFrame

You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query.

For example, to use `sql` to construct `DataFrame`:
You can construct [`DataFrame`]s programmatically using the API, similarly to
other DataFrame APIs. For example, you can read an in memory `RecordBatch` into
a `DataFrame`:

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register an in-memory table containing the following data
// id | bank_account
// ---|-------------
// 1 | 9000
// 2 | 8000
// 3 | 7000
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
// Create a DataFrame that scans the user table, and finds
// all users with a bank account at least 8000
// and sorts the results by bank account in descending order
let dataframe = ctx
.read_batch(data)?
.filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
.sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC

Ok(())
}
```

To construct `DataFrame` using the API:
You can _also_ generate a `DataFrame` from a SQL query and use the DataFrame's APIs
to manipulate the output of the query.

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx
.table("users")
.filter(col("a").lt_eq(col("b")))?
.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::assert_batches_eq;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the same in-memory table as the previous example
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame using SQL
let dataframe = ctx.sql("SELECT * FROM users;")
.await?
// Note we can filter the output of the query using the DataFrame API
.filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000

let results = &dataframe.collect().await?;

// use the `assert_batches_eq` macro to show the output
assert_batches_eq!(
vec![
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 8000 |",
"+----+--------------+",
],
&results
);
Ok(())
}
```

## Collect / Streaming Exec

DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations.
DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until
they are executed, which allows for additional optimizations.

When you have a `DataFrame`, you can run it in one of three ways:
You can run a `DataFrame` in one of three ways:

1. `collect` which executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache` which executes the query and buffers the output into a new in memory DataFrame.
1. `collect`: executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.`

You can just collect all outputs once like:
To collect all outputs into a memory buffer, use the `collect` method:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
use datafusion::prelude::*;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read the contents of a CSV file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and collect the results as a Vec<RecordBatch>
let batches = df.collect().await?;
for record_batch in batches {
println!("{record_batch:?}");
}
Ok(())
}
```

You can also use stream output to incrementally generate output one `RecordBatch` at a time
Use `execute_stream` to incrementally generate output one `RecordBatch` at a time:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let mut stream = df.execute_stream().await?;
while let Some(rb) = stream.next().await {
println!("{rb:?}");
use datafusion::prelude::*;
use datafusion::error::Result;
use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// begin execution (returns quickly, does not compute results)
let mut stream = df.execute_stream().await?;
// results are returned incrementally as they are computed
while let Some(record_batch) = stream.next().await {
println!("{record_batch:?}");
}
Ok(())
}
```

# Write DataFrame to Files

You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`.

When writing a file, DataFusion will execute the DataFrame and stream the results to a file.
You can also write the contents of a `DataFrame` to a file. When writing a file,
DataFusion executes the `DataFrame` and streams the results to the output.
DataFusion comes with support for writing `csv`, `json` `arrow` `avro`, and
`parquet` files, and supports writing custom file formats via API (see
[`custom_file_format.rs`] for an example)

For example, to write a csv_file
For example, to read a CSV file and write it to a parquet file, use the
[`DataFrame::write_parquet`] method

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;

dataframe
.write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None)
.await;
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// stream the contents of the DataFrame to the `example.parquet` file
df.write_parquet(
"example.parquet",
DataFrameWriteOptions::new(),
None, // writer_options
).await;
Ok(())
}
```

and the file will look like (Example Output):
[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs

```
id,bank_account
1,9000
The output file will look like (Example Output):

```sql
> select * from '../datafusion/core/example.parquet';
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+
```

## Transform between LogicalPlan and DataFrame
## Relationship between `LogicalPlan`s and `DataFrame`s

As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them.
The `DataFrame` struct is defined like this:

```rust
// Just combine LogicalPlan with SessionContext and you get a DataFrame
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::LogicalPlan;
pub struct DataFrame {
// state required to execute a LogicalPlan
session_state: Box<SessionState>,
// LogicalPlan that describes the computation to perform
plan: LogicalPlan,
}
```

// get LogicalPlan in dataframe
let plan = dataframe.logical_plan().clone();
As shown above, `DataFrame` is a thin wrapper of `LogicalPlan`, so you can
easily go back and forth between them.

// construct a DataFrame with LogicalPlan
let new_df = DataFrame::new(ctx.state(), plan);
```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// You can easily get the LogicalPlan from the DataFrame
let (_state, plan) = df.into_parts();
// Just combine LogicalPlan with SessionContext and you get a DataFrame
// get LogicalPlan in dataframe
let new_df = DataFrame::new(ctx.state(), plan);
Ok(())
}
```

In fact, using the [`DataFrame`]s methods you can create the same
[`LogicalPlan`]s as when using [`LogicalPlanBuilder`]:

```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("a"), col("b")])?
.sort(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
// Similar to `SELECT a, b FROM example.csv ORDER BY a`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}
```

[users guide]: ../user-guide/dataframe.md
[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html
[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html
[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html
[`dataframe::write_parquet`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet
Loading

0 comments on commit 1e0c06e

Please sign in to comment.