Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimize the dependency on SessionState #11420

Open
jayzhan211 opened this issue Jul 12, 2024 · 2 comments
Open

Minimize the dependency on SessionState #11420

jayzhan211 opened this issue Jul 12, 2024 · 2 comments
Labels
enhancement New feature or request

Comments

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 12, 2024

Is your feature request related to a problem or challenge?

There are many functions in datafusion-core that take SessionState as arguments but only actually rely on portion of them. This add the additional dependency that is not necessary, therefore blocking us from extracting module out of core #10782.

For example, If we want to pull CatalogProvider out of core, we need to pull out TableProvider first. But because it has scan function that takes SessionState which contains CatalogProviderList therefore there is a circular dependency. Similar issues are already mentioned in #11182

Describe the solution you'd like

I think we need to redesign those functions that take SessionState and minimize the dependencies for them.

Given one of the scan function here, we can see that we only need state.config_options().explain and state.execution_props() instead of the whole SessionState

async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut partitions = vec![];
for arc_inner_vec in self.batches.iter() {
let inner_vec = arc_inner_vec.read().await;
partitions.push(inner_vec.clone())
}
let mut exec =
MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;
let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
// add sort information if present
let sort_order = self.sort_order.lock();
if !sort_order.is_empty() {
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
let file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
create_physical_sort_exprs(
sort_exprs,
&df_schema,
state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
}
Ok(Arc::new(exec))
}

In this case, we can create TableProviderConext that encapsulates a subset of the information from SessionState.

pub struct TableProviderConext {
    explain_options: ExplainOptions,
    executionProps: ExecutionProps // maybe with reference to avoid copy
}

The same idea applies to PhysicalPlanner, we usually just need PhysicalOptimizeRules or other information about plan.

#[async_trait]
pub trait PhysicalPlanner: Send + Sync {
/// Create a physical plan from a logical plan
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>>;
/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `expr`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>>;
}

And, create_initial_plan in DefaultPhysicalPlanner, ExecutionOptions is all we need, nothing else.

let planning_concurrency = session_state
.config_options()
.execution
.planning_concurrency;

Describe alternatives you've considered

No response

Additional context

No response

@jayzhan211 jayzhan211 added the enhancement New feature or request label Jul 12, 2024
@alamb
Copy link
Contributor

alamb commented Jul 12, 2024

In theory I think you are right.

However, I am worried about removing SessionState from the scan method as it is so widely used and has everything people need. In fact here is at least one request to use SessionState more: #11193 from @cisaacson

Here is one alternate proposal #10782 (comment) (basically treat SessionState as part of the catalog API).

ANother potentially hacky idea is to do something like pass SessionState as an dyn Any --

 async fn scan( 
     &self, 
     state: &Any, 
     projection: Option<&Vec<usize>>, 
     _filters: &[Expr], 
     _limit: Option<usize>, 
 ) -> Result<Arc<dyn ExecutionPlan>> { 

Which would break the explicit dependency but implementors of scan could still get it by downcasting

let state = state.as_any().downcast_ref::<SessionState>().unwrap();

🤔

@cisaacson
Copy link
Contributor

@alamb I agree, removing it or using Any would make lots of things challenging. The idea of state for a SessionContext that is accessible from a variety of places is important.

The easiest way to address this is to move SessionState to a smaller sub-crate like common-runtime or even common-traits (although the latter would be complicated as many traits depend on structs and other types).

I see why it is used in so many places, and I do support the idea that it is important to do so.

Perhaps we could just identify all truly common types like this and move them into a sub-crate, that could be done without too much effort. Such a change so incorporate other types of common objects too. In this way anytime someone wants to break something out of core the most likely common dependencies would be available in the smaller sub-crate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants