Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enable_url_table as a argument to SessionStateBuilder #12394

Open
Tracked by #12550
alamb opened this issue Sep 9, 2024 · 10 comments
Open
Tracked by #12550

Add enable_url_table as a argument to SessionStateBuilder #12394

alamb opened this issue Sep 9, 2024 · 10 comments
Assignees

Comments

@alamb
Copy link
Contributor

alamb commented Sep 9, 2024

          I wonder if it would be possible here to add `enable_url_table` as a argument to SessionStateBuilder

So it could look like

let session_state =  SessionStateBuilder::new()
            .with_default_features()
            .with_config(cfg)
            .enable_url_table()
            .build();
let ctx = SessionContext::from(session_state);

🤔

(we can do this as a follow on as well)

Originally posted by @alamb in #11035 (comment)

@alamb
Copy link
Contributor Author

alamb commented Sep 9, 2024

@goldmedal says:

Sure. I think it's a good idea. We can have a flag in the builder. If it's true, we can wrap the catalog list implicitly. It may need some additional implementation and test cases. I prefer to do it in the follow-up PR.

@goldmedal
Copy link
Contributor

take

@goldmedal
Copy link
Contributor

@alamb
I found some problems with this issue. Because the dynamic file querying implementation relies on SessionStore which should consume a weak RwLock of SessionState, we need to register the weak reference after building the session state:

pub fn enable_url_table(&self) -> Self {
let state_ref = self.state();
let factory = Arc::new(DynamicListTableFactory::new(SessionStore::new()));
let catalog_list = Arc::new(DynamicFileCatalog::new(
Arc::clone(state_ref.catalog_list()),
Arc::clone(&factory) as Arc<dyn UrlTableFactory>,
));
let new_state = SessionStateBuilder::new_from_existing(self.state())
.with_catalog_list(catalog_list)
.build();
let ctx = SessionContext::new_with_state(new_state);
factory.session_store().with_state(ctx.state_weak_ref());

However, the RwLock is commonly built when creating the SessionContext.

pub fn new_with_state(state: SessionState) -> Self {
Self {
session_id: state.session_id().to_string(),
session_start_time: Utc::now(),
state: Arc::new(RwLock::new(state)),
}

I think your proposed usage doesn't work 🤔 because build() should return the SessionState instance.
Maybe we can provide another way to build the session state like

let session_state_ref: Arc<RwLock<SessionState>> =  SessionStateBuilder::new()
            .with_default_features()
            .with_config(cfg)
            .enable_url_table_and_build()
let ctx = SessionContext::from(session_state_ref);

WDYT?

@alamb
Copy link
Contributor Author

alamb commented Sep 15, 2024

I see. That is a good question 🤔

We could potentially change build() to return an Arc but I don't know what implications that has

Maybe we could change the DynamicTableProvider to keep a reference to some part of the SessionState (rather than an Arc to the whole thing 🤔 ) but that might be messy

@goldmedal
Copy link
Contributor

We could potentially change build() to return an Arc but I don't know what implications that has

I guess it would be a huge breaking change for SessionStateBuilder and SessionContext. The main change is

/// Creates a new `SessionContext` using the provided [`SessionState`]
pub fn new_with_state(state: SessionState) -> Self {
Self {
session_id: state.session_id().to_string(),
session_start_time: Utc::now(),
state: Arc::new(RwLock::new(state)),
}
}

The SessionStateBuilder should produce an RwLock reference, and the SessionContext should be created with it. I'm not sure how many downstream projects would be impacted.

Maybe we could change the DynamicTableProvider to keep a reference to some part of the SessionState (rather than an Arc to the whole thing 🤔 ) but that might be messy

I think the main challenge is URL resolution. The DynamicListTableFactory relies on ListingTableConfig to resolve the URL path and build the table

match ListingTableConfig::new(table_url.clone())
.infer(state)
.await

It’s not easy to extract only the required parts for this

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

Yeah I am not having any great idea at the moment. I'll keep thinking

@alamb
Copy link
Contributor Author

alamb commented Sep 20, 2024

I filed #12550 to track this and other ideas for making the APIs eaiser to use

@alamb
Copy link
Contributor Author

alamb commented Sep 21, 2024

I wonder if we could change ListingTable::infer and anything else that uses SessionState directly to take a trait instance &dyn Session: https://docs.rs/datafusion/latest/datafusion/catalog/trait.Session.html

That might then permit avoiding the need for an enture SessionContext 🤔

@goldmedal
Copy link
Contributor

I wonder if we could change ListingTable::infer and anything else that uses SessionState directly to take a trait instance &dyn Session: https://docs.rs/datafusion/latest/datafusion/catalog/trait.Session.html

That might then permit avoiding the need for an enture SessionContext 🤔

After some research, I found the main usages of SessionState by ListTable::infer are runtime_env and config_options.

If we want to use dyn Session, we must move the API SessionState::get_file_format_factory to dyn Session. It may involve FileFormat and FileFormatFactory, which this API requires.

    /// Retrieves a [FileFormatFactory] based on file extension which has been registered
    /// via SessionContext::register_file_format. Extensions are not case sensitive.
    pub fn get_file_format_factory(
        &self,
        ext: &str,
    ) -> Option<Arc<dyn FileFormatFactory>> {
        self.file_formats.get(&ext.to_lowercase()).cloned()
    }

The format.infer_schema API also needs SessionState but I found it only be used by parquet and csv format actually, and then they require config options only. I guess that it can be replaced by dyn Session easily.

    /// Infer the common schema of the provided objects. The objects will usually
    /// be analysed up to a given number of records or files (as specified in the
    /// format config) then give the estimated common schema. This might fail if
    /// the files have schemas that cannot be merged.
    async fn infer_schema(
        &self,
        state: &SessionState,
        store: &Arc<dyn ObjectStore>,
        objects: &[ObjectMeta],
    ) -> Result<SchemaRef>;

Runtime_env

It's used by ListTableUrl::list_all_files:

        let list = match self.is_collection() {
            true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
                None => store.list(Some(&self.prefix)),
                Some(cache) => {
                    if let Some(res) = cache.get(&self.prefix) {
                        debug!("Hit list all files cache");
                        futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
                            .boxed()
                    } else {
                        let list_res = store.list(Some(&self.prefix));
                        let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
                        cache.put(&self.prefix, Arc::new(vec.clone()));
                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
                    }
                }
            },
            false => futures::stream::once(store.head(&self.prefix)).boxed(),
        };

Some Conclusions

If we can use dyn Session instead, we only need to take the following elements for the implementation:

  • file_formats
  • config_option
  • runtime_env

The building of a dynamic catalog would be:

        let runtime = Arc::new(RuntimeEnv::default());
        // DynamicSession is an implementation of `Session`.
        let factory = Arc::new(DynamicListTableFactory::new(DynamicSession::new(file_formats, config_options, runtime)));
        let catalog_list = Arc::new(DynamicFileCatalog::new(
            Arc::clone(state_ref.catalog_list()),
            Arc::clone(&factory) as Arc<dyn UrlTableFactory>,
        ));

However, the file_formats and configs will be static. We can't register additional formats or change configs at the runtime. (if we want to change them at the runtime, we need to make them be something like share reference 🤔 ) Then, we can remove SessionStore to avoid the issue of registering the current state after building SessionState.

@alamb
Copy link
Contributor Author

alamb commented Sep 22, 2024

If we want to use dyn Session, we must move the API SessionState::get_file_format_factory to dyn Session. It may involve FileFormat and FileFormatFactory, which this API requires.

This seems reasonable to me

However, the file_formats and configs will be static. We can't register additional formats or change configs at the runtime. (if we want to change them at the runtime, we need to make them be something like share reference 🤔 ) Then, we can remove SessionStore to avoid the issue of registering the current state after building SessionState.

I agree this sounds not good. 🤔

I vaguely remember that something similar is needed for information_schema (where it also has to be created after the other providers or something)...

Anyhow, thank you for this thorough review. I d

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants