Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: per-process ingest connections #1058

Merged
merged 38 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
34137ca
more service to instance variable on connector
ryannikolaidis Aug 2, 2023
ffdc081
tidy
ryannikolaidis Aug 3, 2023
97caff9
add session handler
ryannikolaidis Aug 4, 2023
17420fc
in progress
ryannikolaidis Aug 6, 2023
5edd0a0
remove processor resource
ryannikolaidis Aug 6, 2023
05e824d
remove debug log
ryannikolaidis Aug 7, 2023
69ad5d3
remove unneeded
ryannikolaidis Aug 7, 2023
d74e74f
clean up
ryannikolaidis Aug 7, 2023
71e5201
actually set the session_handle
ryannikolaidis Aug 8, 2023
1fbe77c
actually get this working and lint free
ryannikolaidis Aug 8, 2023
27e6c00
actually resolve lint
ryannikolaidis Aug 9, 2023
d57eebc
update comments
ryannikolaidis Aug 10, 2023
e767e65
tidy
ryannikolaidis Aug 10, 2023
0be0182
end of doc line
ryannikolaidis Aug 10, 2023
48c17dc
use context
ryannikolaidis Aug 10, 2023
058a85d
fix tests
ryannikolaidis Aug 10, 2023
352acb2
Merge branch 'main' into ryan/reuse-connections
ryannikolaidis Aug 10, 2023
fd2f27b
tidy
ryannikolaidis Aug 10, 2023
c4074d3
bump note
ryannikolaidis Aug 11, 2023
d11d4ea
Merge branch 'main' into ryan/reuse-connections
ryannikolaidis Aug 11, 2023
626a659
bump version
ryannikolaidis Aug 11, 2023
4071a35
bump docstring
ryannikolaidis Aug 11, 2023
7391ea1
lint
ryannikolaidis Aug 11, 2023
cdb50e2
debugging test failure
ryannikolaidis Aug 11, 2023
dbebe08
debug
ryannikolaidis Aug 11, 2023
aa7de05
more debug
ryannikolaidis Aug 11, 2023
7e6164d
re-enable
ryannikolaidis Aug 11, 2023
3d7f7e1
Merge branch 'main' into ryan/reuse-connections
ryannikolaidis Aug 11, 2023
b38433a
bump test
ryannikolaidis Aug 11, 2023
cf3de88
bump version
ryannikolaidis Aug 11, 2023
9b097bb
version bump
ryannikolaidis Aug 11, 2023
6513520
Update unstructured/ingest/interfaces.py
ryannikolaidis Aug 14, 2023
1f64a06
Merge branch 'main' into ryan/reuse-connections
ryannikolaidis Aug 15, 2023
3ffeaae
manage only in subprocess
ryannikolaidis Aug 16, 2023
29b97f5
bump comment
ryannikolaidis Aug 16, 2023
18433e3
bump comment
ryannikolaidis Aug 16, 2023
cc7149b
Merge branch 'main' into ryan/reuse-connections
ryannikolaidis Aug 17, 2023
171f4a6
version bump
ryannikolaidis Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
## 0.9.3-dev0
## 0.9.3-dev1

### Enhancements

* Add `unique_element_ids` kwarg to partition functions. If `True`, will use a UUID
for element IDs instead of a SHA-256 hash.
* Add functionality to switch `html` text parser based on whether the `html` text contains emoji
* Add functionality to check if a string contains any emoji characters
* Adds ability to reuse connections per process in unstructured-ingest

### Features

Expand Down
44 changes: 44 additions & 0 deletions test_unstructured_ingest/unit/doc_processor/test_generalized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dataclasses import dataclass

import pytest

from unstructured.ingest.doc_processor.generalized import (
process_document,
session_handle_var,
)
from unstructured.ingest.interfaces import BaseIngestDoc, IngestDocSessionHandleMixin


@dataclass
class IngestDocWithSessionHandle(IngestDocSessionHandleMixin, BaseIngestDoc):
pass

@pytest.fixture(autouse=True)
def _reset_session_handle():
session_handle_var.set(None)

def test_process_document_with_session_handle(mocker):
"""Test that the process_document function calls the doc_processor_fn with the correct
arguments, assigns the session handle, and returns the correct results."""
mock_session_handle = mocker.MagicMock()
session_handle_var.set(mock_session_handle)
mock_doc = mocker.MagicMock(spec=(IngestDocWithSessionHandle))

result = process_document(mock_doc)

mock_doc.get_file.assert_called_once_with()
mock_doc.write_result.assert_called_with()
mock_doc.cleanup_file.assert_called_once_with()
assert result == mock_doc.process_file.return_value
assert mock_doc.session_handle == mock_session_handle


def test_process_document_no_session_handle(mocker):
"""Test that the process_document function calls does not assign session handle the IngestDoc
does not have the session handle mixin."""
session_handle_var.set(mocker.MagicMock())
mock_doc = mocker.MagicMock(spec=(BaseIngestDoc))

process_document(mock_doc)

assert not hasattr(mock_doc, "session_handle")
5 changes: 0 additions & 5 deletions test_unstructured_ingest/unit/test_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def test_partition_file():
assert data_source_metadata["date_processed"] == TEST_DATE_PROCESSSED


@freeze_time(TEST_DATE_PROCESSSED)
def test_process_file_fields_include_default(mocker, partition_test_results):
"""Validate when metadata_include and metadata_exclude are not set, all fields:
("element_id", "text", "type", "metadata") are included"""
Expand All @@ -162,10 +161,6 @@ def test_process_file_fields_include_default(mocker, partition_test_results):
isd_elems = test_ingest_doc.process_file()
assert len(isd_elems)
assert mock_partition.call_count == 1
assert (
mock_partition.call_args.kwargs["data_source_metadata"].date_processed
== TEST_DATE_PROCESSSED
)
Comment on lines -165 to -168
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this. it wasn't critical to the intent of this test, but also needing to freeze time here in combination with other tests touching generalized (and by extension calling get_model) was triggering a bizarre failure with importing transformers.models.open_llama.tokenization_open_llama? More info here.

for elem in isd_elems:
assert {"element_id", "text", "type", "metadata"} == set(elem.keys())
data_source_metadata = elem["metadata"]["data_source"]
Expand Down
34 changes: 34 additions & 0 deletions test_unstructured_ingest/unit/test_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest

from unstructured.ingest.doc_processor.generalized import session_handle_var
from unstructured.ingest.processor import Processor


@pytest.fixture(autouse=True)
def _reset_session_handle():
session_handle_var.set(None)

@pytest.mark.parametrize("test_verbose", [True, False])
def test_processor_init_with_session_handle(mocker, test_verbose):
"""Test that the init function calls to ingest_log_streaming_init and assigns the session handle
when the a function is passed in."""
mock_ingest_log_streaming_init = mocker.patch(
"unstructured.ingest.processor.ingest_log_streaming_init",
)
mock_create_session_handle_fn = mocker.MagicMock()
Processor.process_init(test_verbose, mock_create_session_handle_fn)
mock_ingest_log_streaming_init.assert_called_once_with(test_verbose)
mock_create_session_handle_fn.assert_called_once_with()
assert (
session_handle_var.get() == mock_create_session_handle_fn.return_value
)

def test_processor_init_no_session_handle(mocker):
"""Test that the init function calls to ingest_log_streaming_init and does not assign the session handle
when the a function is not passed in."""
mock_ingest_log_streaming_init = mocker.patch(
"unstructured.ingest.processor.ingest_log_streaming_init",
)
Processor.process_init(True)
mock_ingest_log_streaming_init.assert_called_once_with(True)
assert session_handle_var.get() is None
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.9.3-dev0" # pragma: no cover
__version__ = "0.9.3-dev1" # pragma: no cover
41 changes: 31 additions & 10 deletions unstructured/ingest/connector/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,36 @@
from dataclasses import dataclass
from mimetypes import guess_extension
from pathlib import Path
from typing import Dict, Optional
from typing import TYPE_CHECKING, Dict, Optional, cast

from unstructured.file_utils.filetype import EXT_TO_FILETYPE
from unstructured.file_utils.google_filetype import GOOGLE_DRIVE_EXPORT_TYPES
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
BaseSessionHandle,
ConnectorCleanupMixin,
ConnectorSessionHandleMixin,
IngestDocCleanupMixin,
IngestDocSessionHandleMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies

if TYPE_CHECKING:
from googleapiclient.discovery import Resource as GoogleAPIResource

FILE_FORMAT = "{id}-{name}{ext}"
DIRECTORY_FORMAT = "{id}-{name}"


@dataclass
class GoogleDriveSessionHandle(BaseSessionHandle):
service: "GoogleAPIResource"


@requires_dependencies(["googleapiclient"], extras="google-drive")
def create_service_account_object(key_path, id=None):
"""
Expand Down Expand Up @@ -81,13 +92,13 @@ def __post_init__(self):
f"Extension not supported. "
f"Value MUST be one of {', '.join([k for k in EXT_TO_FILETYPE if k is not None])}.",
)
self.service = create_service_account_object(self.service_account_key, self.drive_id)


@dataclass
class GoogleDriveIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
class GoogleDriveIngestDoc(IngestDocSessionHandleMixin, IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleGoogleDriveConfig
file_meta: Dict
session_handle: Optional[GoogleDriveSessionHandle] = None

@property
def filename(self):
Expand All @@ -103,7 +114,9 @@ def get_file(self):
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload

self.config.service = create_service_account_object(self.config.service_account_key)
if self.session_handle is None:
raise ValueError("Google Drive session handle was not set.")
self.session_handle = cast(GoogleDriveSessionHandle, self.session_handle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is already set above:

session_handle: Optional[GoogleDriveSessionHandle] = None

I don't think the cast is necessary.

Copy link
Contributor Author

@ryannikolaidis ryannikolaidis Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc linting gets mad if you don't explicitly cast since it was optional


if self.file_meta.get("mimeType", "").startswith("application/vnd.google-apps"):
export_mime = GOOGLE_DRIVE_EXPORT_TYPES.get(
Expand All @@ -117,12 +130,12 @@ def get_file(self):
)
return

request = self.config.service.files().export_media(
request = self.session_handle.service.files().export_media(
fileId=self.file_meta.get("id"),
mimeType=export_mime,
)
else:
request = self.config.service.files().get_media(fileId=self.file_meta.get("id"))
request = self.session_handle.service.files().get_media(fileId=self.file_meta.get("id"))
file = io.BytesIO()
downloader = MediaIoBaseDownload(file, request)
downloaded = False
Expand Down Expand Up @@ -160,22 +173,32 @@ def write_result(self):
logger.info(f"Wrote {self._output_filename}")


class GoogleDriveConnector(ConnectorCleanupMixin, BaseConnector):
class GoogleDriveConnector(ConnectorSessionHandleMixin, ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching documents from Google Drive"""

config: SimpleGoogleDriveConfig

def __init__(self, standard_config: StandardConnectorConfig, config: SimpleGoogleDriveConfig):
super().__init__(standard_config, config)

@classmethod
def create_session_handle(
cls,
config: BaseConnectorConfig,
) -> GoogleDriveSessionHandle:
config = cast(SimpleGoogleDriveConfig, config)
service = create_service_account_object(config.service_account_key)
return GoogleDriveSessionHandle(service=service)

def _list_objects(self, drive_id, recursive=False):
files = []
service = create_service_account_object(self.config.service_account_key)

def traverse(drive_id, download_dir, output_dir, recursive=False):
page_token = None
while True:
response = (
self.config.service.files()
service.files()
.list(
spaces="drive",
fields="nextPageToken, files(id, name, mimeType)",
Expand Down Expand Up @@ -244,6 +267,4 @@ def initialize(self):

def get_ingest_docs(self):
files = self._list_objects(self.config.drive_id, self.config.recursive)
# Setting to None because service object can't be pickled for multiprocessing.
self.config.service = None
return [GoogleDriveIngestDoc(self.standard_config, self.config, file) for file in files]
22 changes: 20 additions & 2 deletions unstructured/ingest/doc_processor/generalized.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
"""Process arbitrary files with the Unstructured library"""

import os
from typing import Any, Dict, List, Optional
from contextvars import ContextVar
from typing import Any, Dict, List, Optional, cast

from unstructured_inference.models.base import get_model

from unstructured.ingest.interfaces import BaseIngestDoc as IngestDoc
from unstructured.ingest.interfaces import (
BaseSessionHandle,
IngestDocSessionHandleMixin,
)
from unstructured.ingest.logger import logger

# This is a context variable that can be set by the pool process to be used by the
# doc processor to assign the session handle to the doc. Note: because
# the session handle is not picklable, it cannot be passed directly to the pool or
# the doc processor.
session_handle_var: ContextVar[Optional[BaseSessionHandle]] = ContextVar(
"session_handle",
default=None,
)


def initialize():
"""Download default model or model specified by UNSTRUCTURED_HI_RES_MODEL_NAME environment
Expand All @@ -32,14 +46,18 @@ def process_document(doc: "IngestDoc", **partition_kwargs) -> Optional[List[Dict
"""
isd_elems_no_filename = None
try:
# assign the session handle for the data source on the doc
session_handle = session_handle_var.get()
if session_handle is not None and isinstance(doc, IngestDocSessionHandleMixin):
cast(IngestDocSessionHandleMixin, doc).session_handle = session_handle
# does the work necessary to load file into filesystem
# in the future, get_file_handle() could also be supported
doc.get_file()

isd_elems_no_filename = doc.process_file(**partition_kwargs)

# Note, this may be a no-op if the IngestDoc doesn't do anything to persist
# the results. Instead, the MainProcess (caller) may work with the aggregate
# the results. Instead, the Processor (caller) may work with the aggregate
# results across all docs in memory.
doc.write_result()
except Exception:
Expand Down
18 changes: 18 additions & 0 deletions unstructured/ingest/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
from unstructured.staging.base import convert_to_dict


@dataclass
class BaseSessionHandle(ABC):
"""Abstract definition on which to define resources that are shared across each process/thread.
ryannikolaidis marked this conversation as resolved.
Show resolved Hide resolved
e.g., a connection for making a request for fetching documents."""


@dataclass
class ProcessorConfigs:
"""Common set of config required when running data connectors."""
Expand Down Expand Up @@ -330,3 +336,15 @@ def cleanup_file(self):
):
logger.debug(f"Cleaning up {self}")
os.unlink(self.filename)


class IngestDocSessionHandleMixin:
session_handle: Optional[BaseSessionHandle] = None


class ConnectorSessionHandleMixin:
@classmethod
@abstractmethod
def create_session_handle(cls, config: BaseConnectorConfig) -> BaseSessionHandle:
"""Creates a session handle that will be assigned on each IngestDoc to share
session related resources across all document handling for a given process."""
32 changes: 29 additions & 3 deletions unstructured/ingest/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
import multiprocessing as mp
from contextlib import suppress
from functools import partial
from typing import cast

from unstructured.ingest.doc_processor.generalized import initialize, process_document
from unstructured.ingest.doc_processor.generalized import (
initialize,
process_document,
session_handle_var,
)
from unstructured.ingest.interfaces import (
BaseConnector,
ConnectorSessionHandleMixin,
ProcessorConfigs,
)
from unstructured.ingest.logger import ingest_log_streaming_init, logger
Expand Down Expand Up @@ -41,6 +47,13 @@ def initialize(self):
def cleanup(self):
self.doc_connector.cleanup()

@classmethod
def process_init(cls, verbose, create_session_handle_fn=None):
ingest_log_streaming_init(verbose)
# set the session handle for the doc processor if the connector supports it
if create_session_handle_fn is not None:
session_handle_var.set(create_session_handle_fn())

def _filter_docs_with_outputs(self, docs):
num_docs_all = len(docs)
docs = [doc for doc in docs if not doc.has_output()]
Expand Down Expand Up @@ -74,15 +87,28 @@ def run(self):
if not docs:
return

# get a create_session_handle function if the connector supports it
create_session_handle_fn = (
partial(
cast(ConnectorSessionHandleMixin, self.doc_connector).create_session_handle,
cast(BaseConnector, self.doc_connector).config,
)
if isinstance(self.doc_connector, ConnectorSessionHandleMixin)
else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this handle should actually exist in the parent process. but i think i see the issue that the init process needs connection info, at the very least, following this approach.

i'm starting to think the cleanest way to do this is for the subprocess itself to create the SessionHandle lazily for the first IngestDoc it processes, since it will have the connector config at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, it could pass the connector config of the first IngestDoc in initargs for the sub process's initializer, and then connector can choose to create its own SessionHandle if applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coming back to this fresh, I think I was over-engineering in case we ever needed to handle subprocess work async, but starting to feel like it's safe to just assume this is always serial within that subprocess. I like the idea of just lazily creating it, will head with that.

also, I'm now thinking the config should just own the definition for how to create the session handle. this would also be cleaner if we move that logic inside the subprocess since we don't pass the connector itself through (I don't believe).

)

# Debugging tip: use the below line and comment out the mp.Pool loop
# block to remain in single process
# self.doc_processor_fn(docs[0])
logger.info(f"Processing {len(docs)} docs")
try:
with mp.Pool(
processes=self.num_processes,
initializer=ingest_log_streaming_init,
initargs=(logging.DEBUG if self.verbose else logging.INFO,),
initializer=self.process_init,
initargs=(
logging.DEBUG if self.verbose else logging.INFO,
create_session_handle_fn,
),
) as pool:
pool.map(self.doc_processor_fn, docs)
finally:
Expand Down