Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,64 @@ Datafusion](https://github.com/apache/arrow-datafusion) query engine.
It was originally an example of the [pgwire](https://github.com/sunng87/pgwire)
project.

## Usage

At the moment, this tools is designed as a command-line application that serves
any JSON/CSV/Arrow files as table, and expose them via Postgres compatible
protocol, with which you can connect using psql or language drivers to execute
`SELECT` queries against them.

```
datafusion-postgres 0.1.0
A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables.

USAGE:
datafusion-postgres [OPTIONS]

FLAGS:
-h, --help Prints help information
-V, --version Prints version information

OPTIONS:
-a <arrow-tables>... Arrow files to register as table, using syntax `table_name:file_path`
-c <csv-tables>... CSV files to register as table, using syntax `table_name:file_path`
-j <json-tables>... JSON files to register as table, using syntax `table_name:file_path`
```

For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`.

```
datafusion-postgres -c ettm1:ETTm1.csv
Loaded ETTm1.csv as table ettm1
Listening to 127.0.0.1:5432

```

Then connect to it via `psql`:

```
psql -h 127.0.0.1 -p 5432 -U postgres
psql (16.2, server 0.20.0)
WARNING: psql major version 16, server major version 0.20.
Some psql features might not work.
Type "help" for help.

postgres=> select * from ettm1 limit 10;
date | HUFL | HULL | MUFL | MULL | LUFL | LULL | OT
----------------------------+--------------------+--------------------+--------------------+---------------------+-------------------+--------------------+--------------------
2016-07-01 00:00:00.000000 | 5.827000141143799 | 2.009000062942505 | 1.5989999771118164 | 0.4620000123977661 | 4.203000068664552 | 1.3400000333786009 | 30.5310001373291
2016-07-01 00:15:00.000000 | 5.760000228881836 | 2.075999975204468 | 1.4919999837875366 | 0.4259999990463257 | 4.263999938964844 | 1.4010000228881836 | 30.459999084472656
2016-07-01 00:30:00.000000 | 5.760000228881836 | 1.9420000314712524 | 1.4919999837875366 | 0.3910000026226044 | 4.234000205993652 | 1.309999942779541 | 30.038000106811523
2016-07-01 00:45:00.000000 | 5.760000228881836 | 1.9420000314712524 | 1.4919999837875366 | 0.4259999990463257 | 4.234000205993652 | 1.309999942779541 | 27.01300048828125
2016-07-01 01:00:00.000000 | 5.692999839782715 | 2.075999975204468 | 1.4919999837875366 | 0.4259999990463257 | 4.142000198364259 | 1.371000051498413 | 27.78700065612793
2016-07-01 01:15:00.000000 | 5.492000102996826 | 1.9420000314712524 | 1.4570000171661377 | 0.3910000026226044 | 4.111999988555908 | 1.2790000438690186 | 27.716999053955078
2016-07-01 01:30:00.000000 | 5.357999801635742 | 1.875 | 1.350000023841858 | 0.35499998927116394 | 3.928999900817871 | 1.3400000333786009 | 27.645999908447266
2016-07-01 01:45:00.000000 | 5.1570000648498535 | 1.8079999685287482 | 1.350000023841858 | 0.3199999928474426 | 3.806999921798706 | 1.2790000438690186 | 27.083999633789066
2016-07-01 02:00:00.000000 | 5.1570000648498535 | 1.741000056266785 | 1.2790000438690186 | 0.35499998927116394 | 3.776999950408936 | 1.218000054359436 | 27.78700065612793
2016-07-01 02:15:00.000000 | 5.1570000648498535 | 1.8079999685287482 | 1.350000023841858 | 0.4259999990463257 | 3.776999950408936 | 1.187999963760376 | 27.506000518798828
(10 rows)
```

## License

This library is released under Apache license.
2 changes: 2 additions & 0 deletions src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ fn encode_value(
idx: usize,
) -> PgWireResult<()> {
match arr.data_type() {
DataType::Null => encoder.encode_field(&None::<i8>)?,
DataType::Boolean => encoder.encode_field(&get_bool_value(arr, idx))?,
DataType::Int8 => encoder.encode_field(&get_i8_value(arr, idx))?,
DataType::Int16 => encoder.encode_field(&get_i16_value(arr, idx))?,
Expand Down Expand Up @@ -274,6 +275,7 @@ fn encode_value(
}
},
DataType::List(field) => match field.data_type() {
DataType::Null => encoder.encode_field(&None::<i8>)?,
DataType::Boolean => encoder.encode_field(&get_bool_list_value(arr, idx))?,
DataType::Int8 => encoder.encode_field(&get_i8_list_value(arr, idx))?,
DataType::Int16 => encoder.encode_field(&get_i16_list_value(arr, idx))?,
Expand Down
23 changes: 21 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use datafusion::execution::options::{CsvReadOptions, NdJsonReadOptions};
use datafusion::execution::options::{ArrowReadOptions, CsvReadOptions, NdJsonReadOptions};
use datafusion::prelude::SessionContext;
use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::api::{MakeHandler, StatelessMakeHandler};
Expand All @@ -14,13 +14,18 @@ mod handlers;
#[derive(Debug, StructOpt)]
#[structopt(
name = "datafusion-postgres",
about = "A postgres interface for datatfusion"
about = "A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables."
)]
struct Opt {
/// CSV files to register as table, using syntax `table_name:file_path`
#[structopt(short)]
csv_tables: Vec<String>,
/// JSON files to register as table, using syntax `table_name:file_path`
#[structopt(short)]
json_tables: Vec<String>,
/// Arrow files to register as table, using syntax `table_name:file_path`
#[structopt(short)]
arrow_tables: Vec<String>,
}

fn parse_table_def(table_def: &str) -> (&str, &str) {
Expand All @@ -34,13 +39,15 @@ async fn main() {
let opts = Opt::from_args();

let session_context = SessionContext::new();

for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) {
session_context
.register_csv(table_name, table_path, CsvReadOptions::default())
.await
.unwrap_or_else(|e| panic!("Failed to register table: {table_name}, {e}"));
println!("Loaded {} as table {}", table_path, table_name);
}

for (table_name, table_path) in opts.json_tables.iter().map(|s| parse_table_def(s.as_ref())) {
session_context
.register_json(table_name, table_path, NdJsonReadOptions::default())
Expand All @@ -49,6 +56,18 @@ async fn main() {
println!("Loaded {} as table {}", table_path, table_name);
}

for (table_name, table_path) in opts
.arrow_tables
.iter()
.map(|s| parse_table_def(s.as_ref()))
{
session_context
.register_arrow(table_name, table_path, ArrowReadOptions::default())
.await
.unwrap_or_else(|e| panic!("Failed to register table: {table_name}, {e}"));
println!("Loaded {} as table {}", table_path, table_name);
}

let processor = Arc::new(StatelessMakeHandler::new(Arc::new(
handlers::DfSessionService::new(session_context),
)));
Expand Down