diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index 75b470d7087..79660dd1871 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -21,9 +21,9 @@ use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; +use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::TableProvider; use datafusion::prelude::*; -use datafusion::{datasource::parquet::ParquetTable, physical_plan::collect}; use arrow_flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, @@ -80,7 +80,7 @@ impl FlightService for FlightServiceImpl { request: Request, ) -> Result, Status> { let ticket = request.into_inner(); - match String::from_utf8(ticket.ticket.to_vec()) { + match std::str::from_utf8(&ticket.ticket) { Ok(sql) => { println!("do_get: {}", sql); @@ -94,28 +94,22 @@ impl FlightService for FlightServiceImpl { "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), ) - .map_err(|e| to_tonic_err(&e))?; + .map_err(to_tonic_err)?; - // create the query plan - let plan = ctx - .create_logical_plan(&sql) - .and_then(|plan| ctx.optimize(&plan)) - .and_then(|plan| ctx.create_physical_plan(&plan)) - .map_err(|e| to_tonic_err(&e))?; + // create the DataFrame + let df = ctx.sql(sql).map_err(to_tonic_err)?; // execute the query - let results = - collect(plan.clone()).await.map_err(|e| to_tonic_err(&e))?; + let results = df.collect().await.map_err(to_tonic_err)?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); } // add an initial FlightData message that sends schema let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema = plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema( - schema.as_ref(), + &df.schema().clone().into(), &options, ); @@ -197,7 +191,7 @@ impl FlightService for FlightServiceImpl { } } -fn to_tonic_err(e: &datafusion::error::DataFusionError) -> Status { +fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status { Status::internal(format!("{:?}", e)) }