Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@ project.
- Permission control
- Built-in `pg_catalog` tables
- Built-in postgres functions for common meta queries
- [x] DBeaver compatibility
- [x] pgcli compatibility
- `datafusion-postgres-cli`: A cli tool starts a postgres compatible server for
datafusion supported file formats, just like python's `SimpleHTTPServer`.
- `arrow-pg`: A data type mapping, encoding/decoding library for arrow and
postgres(pgwire) data types.

See `auth.rs` for complete implementation examples using `DfAuthSource`.

## Supported Database Clients

- Database Clients
- [x] DBeaver
- [x] pgcli
- BI
- [x] Metabase

## Quick Start

### The Library `datafusion-postgres`
Expand Down
90 changes: 62 additions & 28 deletions datafusion-postgres/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use std::sync::Arc;

use crate::auth::{AuthManager, Permission, ResourceType};
use crate::sql::{
parse, rewrite, AliasDuplicatedProjectionRewrite, BlacklistSqlRewriter, FixArrayLiteral,
PrependUnqualifiedPgTableName, RemoveTableFunctionQualifier, RemoveUnsupportedTypes,
ResolveUnqualifiedIdentifer, RewriteArrayAnyAllOperation, SqlStatementRewriteRule,
parse, rewrite, AliasDuplicatedProjectionRewrite, BlacklistSqlRewriter,
CurrentUserVariableToSessionUserFunctionCall, FixArrayLiteral, PrependUnqualifiedPgTableName,
RemoveTableFunctionQualifier, RemoveUnsupportedTypes, ResolveUnqualifiedIdentifer,
RewriteArrayAnyAllOperation, SqlStatementRewriteRule,
};
use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::*;
use datafusion::sql::parser::Statement;
Expand Down Expand Up @@ -107,6 +110,7 @@ impl DfSessionService {
Arc::new(PrependUnqualifiedPgTableName),
Arc::new(FixArrayLiteral),
Arc::new(RemoveTableFunctionQualifier),
Arc::new(CurrentUserVariableToSessionUserFunctionCall),
];
let parser = Arc::new(Parser {
session_context: session_context.clone(),
Expand Down Expand Up @@ -420,13 +424,15 @@ impl DfSessionService {
let resp = Self::mock_show_response("statement_timeout", &timeout_str)?;
Ok(Some(Response::Query(resp)))
}
_ => Err(PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(
"ERROR".to_string(),
"42704".to_string(),
format!("Unrecognized SHOW command: {query_lower}"),
),
))),
"show transaction isolation level" => {
let resp = Self::mock_show_response("transaction_isolation", "read_committed")?;
Ok(Some(Response::Query(resp)))
}
_ => {
info!("Unsupported show statement: {query_lower}");
let resp = Self::mock_show_response("unsupported_show_statement", "")?;
Ok(Some(Response::Query(resp)))
}
}
} else {
Ok(None)
Expand Down Expand Up @@ -714,24 +720,15 @@ pub struct Parser {
sql_rewrite_rules: Vec<Arc<dyn SqlStatementRewriteRule>>,
}

#[async_trait]
impl QueryParser for Parser {
type Statement = (String, LogicalPlan);

async fn parse_sql<C>(
&self,
_client: &C,
sql: &str,
_types: &[Type],
) -> PgWireResult<Self::Statement> {
log::debug!("Received parse extended query: {sql}"); // Log for debugging

impl Parser {
fn try_shortcut_parse_plan(&self, sql: &str) -> Result<Option<LogicalPlan>, DataFusionError> {
// Check for transaction commands that shouldn't be parsed by DataFusion
let sql_lower = sql.to_lowercase();
let sql_trimmed = sql_lower.trim();

if matches!(
sql_trimmed,
"begin"
"" | "begin"
| "begin transaction"
| "begin work"
| "start transaction"
Expand All @@ -747,13 +744,50 @@ impl QueryParser for Parser {
) {
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
let dummy_schema = datafusion::common::DFSchema::empty();
let dummy_plan = datafusion::logical_expr::LogicalPlan::EmptyRelation(
return Ok(Some(LogicalPlan::EmptyRelation(
datafusion::logical_expr::EmptyRelation {
produce_one_row: false,
schema: std::sync::Arc::new(dummy_schema),
schema: Arc::new(dummy_schema),
},
);
return Ok((sql.to_string(), dummy_plan));
)));
}

// show statement may not be supported by datafusion
if sql_trimmed.starts_with("show") {
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
let show_schema =
Arc::new(Schema::new(vec![Field::new("show", DataType::Utf8, false)]));
let df_schema = show_schema.to_dfschema()?;
return Ok(Some(LogicalPlan::EmptyRelation(
datafusion::logical_expr::EmptyRelation {
produce_one_row: true,
schema: Arc::new(df_schema),
},
)));
}

Ok(None)
}
}

#[async_trait]
impl QueryParser for Parser {
type Statement = (String, LogicalPlan);

async fn parse_sql<C>(
&self,
_client: &C,
sql: &str,
_types: &[Type],
) -> PgWireResult<Self::Statement> {
log::debug!("Received parse extended query: {sql}"); // Log for debugging

// Check for transaction commands that shouldn't be parsed by DataFusion
if let Some(plan) = self
.try_shortcut_parse_plan(sql)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
{
return Ok((sql.to_string(), plan));
}

let mut statements = parse(sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Expand Down
Loading
Loading