Skip to content

Commit 67edfed

Browse files
authored
refactor: remove custom information_schema and use datafusion built-in (#85)
* refactor: remove custom information_schema and use datafusion built-in * refactor: remove unused code
1 parent efa9cdf commit 67edfed

File tree

4 files changed

+4
-204
lines changed

4 files changed

+4
-204
lines changed

datafusion-postgres-cli/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use datafusion::execution::options::{
66
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
77
};
8-
use datafusion::prelude::SessionContext;
8+
use datafusion::prelude::{SessionConfig, SessionContext};
99
use datafusion_postgres::{serve, ServerOptions}; // Assuming the crate name is `datafusion_postgres`
1010
use structopt::StructOpt;
1111

@@ -179,7 +179,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
179179
let mut opts = Opt::from_args();
180180
opts.include_directory_files()?;
181181

182-
let session_context = SessionContext::new();
182+
let session_config = SessionConfig::new().with_information_schema(true);
183+
let session_context = SessionContext::new_with_config(session_config);
183184

184185
setup_session_context(&session_context, &opts).await?;
185186

datafusion-postgres/src/handlers.rs

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
1919
use tokio::sync::Mutex;
2020

2121
use crate::datatypes;
22-
use crate::information_schema::{columns_df, schemata_df, tables_df};
2322
use pgwire::error::{PgWireError, PgWireResult};
2423

2524
pub struct HandlerFactory(pub Arc<DfSessionService>);
@@ -91,31 +90,6 @@ impl DfSessionService {
9190
Ok(QueryResponse::new(Arc::new(fields), Box::pin(row_stream)))
9291
}
9392

94-
// Mock pg_namespace response
95-
async fn mock_pg_namespace<'a>(&self) -> PgWireResult<QueryResponse<'a>> {
96-
let fields = Arc::new(vec![FieldInfo::new(
97-
"nspname".to_string(),
98-
None,
99-
None,
100-
Type::VARCHAR,
101-
FieldFormat::Text,
102-
)]);
103-
104-
let fields_ref = fields.clone();
105-
let rows = self
106-
.session_context
107-
.catalog_names()
108-
.into_iter()
109-
.map(move |name| {
110-
let mut encoder = pgwire::api::results::DataRowEncoder::new(fields_ref.clone());
111-
encoder.encode_field(&Some(&name))?; // Return catalog_name as a schema
112-
encoder.finish()
113-
});
114-
115-
let row_stream = futures::stream::iter(rows);
116-
Ok(QueryResponse::new(fields.clone(), Box::pin(row_stream)))
117-
}
118-
11993
async fn try_respond_set_statements<'a>(
12094
&self,
12195
query_lower: &str,
@@ -189,39 +163,6 @@ impl DfSessionService {
189163
Ok(None)
190164
}
191165
}
192-
193-
async fn try_respond_information_schema<'a>(
194-
&self,
195-
query_lower: &str,
196-
) -> PgWireResult<Option<Response<'a>>> {
197-
if query_lower.contains("information_schema.schemata") {
198-
let df = schemata_df(&self.session_context)
199-
.await
200-
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
201-
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
202-
return Ok(Some(Response::Query(resp)));
203-
} else if query_lower.contains("information_schema.tables") {
204-
let df = tables_df(&self.session_context)
205-
.await
206-
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
207-
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
208-
return Ok(Some(Response::Query(resp)));
209-
} else if query_lower.contains("information_schema.columns") {
210-
let df = columns_df(&self.session_context)
211-
.await
212-
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
213-
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
214-
return Ok(Some(Response::Query(resp)));
215-
}
216-
217-
// Handle pg_catalog.pg_namespace for pgcli compatibility
218-
if query_lower.contains("pg_catalog.pg_namespace") {
219-
let resp = self.mock_pg_namespace().await?;
220-
return Ok(Some(Response::Query(resp)));
221-
}
222-
223-
Ok(None)
224-
}
225166
}
226167

227168
#[async_trait]
@@ -241,10 +182,6 @@ impl SimpleQueryHandler for DfSessionService {
241182
return Ok(vec![resp]);
242183
}
243184

244-
if let Some(resp) = self.try_respond_information_schema(&query_lower).await? {
245-
return Ok(vec![resp]);
246-
}
247-
248185
let df = self
249186
.session_context
250187
.sql(query)
@@ -361,11 +298,8 @@ impl ExtendedQueryHandler for DfSessionService {
361298
return Ok(resp);
362299
}
363300

364-
if let Some(resp) = self.try_respond_information_schema(&query).await? {
365-
return Ok(resp);
366-
}
367-
368301
let (_, plan) = &portal.statement.statement;
302+
369303
let param_types = plan
370304
.get_parameter_types()
371305
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

datafusion-postgres/src/information_schema.rs

Lines changed: 0 additions & 134 deletions
This file was deleted.

datafusion-postgres/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
mod datatypes;
22
mod encoder;
33
mod handlers;
4-
mod information_schema;
54

65
use std::sync::Arc;
76

0 commit comments

Comments
 (0)