-
Notifications
You must be signed in to change notification settings - Fork 822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: per-process ingest connections #1058
Conversation
define processor_resource
adds mechanism for passing in session_handle without needing picklable content
remove debug logging add tests
test_unstructured_ingest/unit/doc_processor/test_generalized.py
Outdated
Show resolved
Hide resolved
I really wish there were a simpler implementation for this, it took me several readings to figure out what all was going on. Just for my own edification: |
unstructured/ingest/processor.py
Outdated
cast(BaseConnector, self.doc_connector).config, | ||
) | ||
if isinstance(self.doc_connector, ConnectorSessionHandleMixin) | ||
else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think this handle should actually exist in the parent process. but i think i see the issue that the init process needs connection info, at the very least, following this approach.
i'm starting to think the cleanest way to do this is for the subprocess itself to create the SessionHandle lazily for the first IngestDoc it processes, since it will have the connector config at that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or, it could pass the connector config of the first IngestDoc in initargs
for the sub process's initializer
, and then connector can choose to create its own SessionHandle if applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coming back to this fresh, I think I was over-engineering in case we ever needed to handle subprocess work async, but starting to feel like it's safe to just assume this is always serial within that subprocess. I like the idea of just lazily creating it, will head with that.
also, I'm now thinking the config should just own the definition for how to create the session handle. this would also be cleaner if we move that logic inside the subprocess since we don't pass the connector itself through (I don't believe).
self.config.service = create_service_account_object(self.config.service_account_key) | ||
if self.session_handle is None: | ||
raise ValueError("Google Drive session handle was not set.") | ||
self.session_handle = cast(GoogleDriveSessionHandle, self.session_handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is already set above:
session_handle: Optional[GoogleDriveSessionHandle] = None
I don't think the cast is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc linting gets mad if you don't explicitly cast since it was optional
Co-authored-by: cragwolfe <[email protected]>
yea, I agree. I think @cragwolfe's note will simplify this a bit, and have some additional thoughts that might help. re: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
looking around the corner, we'll need a to serialize / de-serialize IngestDoc's for other use cases ("over the wire"), but i don't think this PR further complicates that issue.
Currently on ingest, connections to data sources are happening once per document (per IngestDoc). This will mean that likely with a lot of documents on a connector hitting an external service we’ll likely see new connections getting refused after N connections created (depending on the service limits). Instead, where possible (supported by the data source api), we should only create new connections per-process.
This defines a pattern for sharing a connection across all docs per process. Not every connector has the concept of a connection/session that can be kept open and shared across documents, so the session handle flow is opt-in by adding a ConnectorSessionHandleMixin with a method for creating the shared resources. Each connector will have bespoke data that needs to be shared, so the pattern allows the connector to define a SessionHandle subclass to define the data. Because this data in many cases will not be picklable it makes explicitly passing the shared data a bit tricky. This pattern uses an in memory variable to store and access the session handle.
Unit tests were added for unit specific functionality. The session handle flow was implemented for the Google Drive connector to validate end to end.