-
Notifications
You must be signed in to change notification settings - Fork 816
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Sample ingest project with S3 connector (#218)
- Loading branch information
Showing
26 changed files
with
2,253 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[run] | ||
omit = | ||
unstructured/ingest/* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Batch Processing Documents | ||
|
||
Several classes are provided in the Unstructured library | ||
to enable effecient batch processing of documents. | ||
|
||
## The Abstractions | ||
|
||
```mermaid | ||
sequenceDiagram | ||
participant MainProcess | ||
participant DocReader (connector) | ||
participant DocProcessor | ||
participant StructuredDocWriter (conncector) | ||
MainProcess->>DocReader (connector): Initialize / Authorize | ||
DocReader (connector)->>MainProcess: All doc metadata (no file content) | ||
loop Single doc at a time (allows for multiprocessing) | ||
MainProcess->>DocProcessor: Raw document metadata (no file content) | ||
DocProcessor->>DocReader (connector): Request document | ||
DocReader (connector)->>DocProcessor: Single document payload | ||
Note over DocProcessor: Process through Unstructured | ||
DocProcessor->>StructuredDocWriter (conncector): Write Structured Data | ||
Note over StructuredDocWriter (conncector): <br /> Optionally store version info, filename, etc | ||
DocProcessor->>MainProcess: Structured Data (only JSON in V0) | ||
end | ||
Note over MainProcess: Optional - process structured data from all docs | ||
``` | ||
|
||
## Sample Connector: S3 | ||
|
||
See the sample project [examples/ingest/s3-small-batch/main.py](examples/ingest/s3-small-batch/main.py), which processes all the documents under a given s3 URL with 2 parallel processes, writing the structured json output to `structured-outputs/`. | ||
|
||
You can try it out with | ||
|
||
PYTHONPATH=. python examples/ingest/s3-small-batch/main.py | ||
|
||
The abstractions in the above diagram are honored in this project (though ABC's are not yet written), with the exception of the StructuredDocWriter which may be added more formally at a later time. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import multiprocessing as mp | ||
import os | ||
from unstructured.ingest.connector.s3_connector import S3Connector, SimpleS3Config | ||
from unstructured.ingest.doc_processor.generalized import process_document | ||
|
||
class MainProcess: | ||
|
||
def __init__(self, doc_connector, doc_processor_fn, num_processes): | ||
# initialize the reader and writer | ||
self.doc_connector = doc_connector | ||
self.doc_processor_fn = doc_processor_fn | ||
self.num_processes = num_processes | ||
|
||
|
||
def initialize(self): | ||
"""Slower initialization things: check connections, load things into memory, etc.""" | ||
self.doc_connector.initialize() | ||
|
||
def cleanup(self): | ||
self.doc_connector.cleanup() | ||
|
||
def run(self): | ||
self.initialize() | ||
|
||
self.doc_connector.fetch_docs() | ||
|
||
# fetch the list of lazy downloading IngestDoc obj's | ||
docs = self.doc_connector.fetch_docs() | ||
|
||
# Debugging tip: use the below line and comment out the mp.Pool loop | ||
# block to remain in single process | ||
#self.doc_processor_fn(docs[0]) | ||
|
||
with mp.Pool(processes=self.num_processes) as pool: | ||
results = pool.map(self.doc_processor_fn, docs) | ||
|
||
self.cleanup() | ||
|
||
@staticmethod | ||
def main(): | ||
doc_connector = S3Connector( | ||
config=SimpleS3Config( | ||
s3_url="s3://utic-dev-tech-fixtures/small-pdf-set/", | ||
output_dir="structured-output", | ||
# set to False to use your AWS creds (not needed for this public s3 url) | ||
anonymous=True, | ||
), | ||
) | ||
MainProcess(doc_connector=doc_connector, | ||
doc_processor_fn=process_document, | ||
num_processes=2).run() | ||
|
||
if __name__ == '__main__': | ||
MainProcess.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.