Skip to content

Commit 5e195ba

Browse files
authored
Wrote text ingestors, still incomplete tests (#37)
* Wrote text ingestors, still incomplete tests * Added tests for text * removed print * Resolved conflicts
1 parent 9c0cbe8 commit 5e195ba

File tree

7 files changed

+152
-4
lines changed

7 files changed

+152
-4
lines changed

querent/ingestors/ingestor_errors.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from enum import Enum
2+
3+
4+
class IngestorError(Enum):
5+
EOF = "End of File"
6+
ETCD = "ETCD Error"
7+
NETWORK = "Network Error"
8+
TIMEOUT = "Timeout"
9+
UNKNOWN = "Unknown Error"

querent/ingestors/ingestor_manager.py

+10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
1+
"""
2+
Ingestor manager, for managing all the factories with backend
3+
"""
14
from typing import Optional
25
from querent.config.ingestor_config import IngestorBackend
36
from querent.ingestors.base_ingestor import BaseIngestor
47
from querent.ingestors.ingestor_factory import IngestorFactory, UnsupportedIngestor
58
from querent.ingestors.pdfs.pdf_ingestor_v1 import PdfIngestorFactory
9+
from querent.ingestors.texts.text_ingestor import TextIngestorFactory
610
from querent.ingestors.audio.audio_ingestors import AudioIngestorFactory
711
from querent.ingestors.json.json_ingestor import JsonIngestorFactory
812
from querent.ingestors.images.image_ingestor import ImageIngestorFactory
913

1014

1115
class IngestorFactoryManager:
16+
"""Factory manager"""
17+
1218
def __init__(self):
1319
self.ingestor_factories = {
1420
IngestorBackend.PDF.value: PdfIngestorFactory(),
21+
IngestorBackend.TEXT.value: TextIngestorFactory(),
1522
IngestorBackend.MP3.value: AudioIngestorFactory(),
1623
IngestorBackend.WAV.value: AudioIngestorFactory(),
1724
IngestorBackend.JSON.value: JsonIngestorFactory(),
@@ -22,14 +29,17 @@ def __init__(self):
2229
}
2330

2431
async def get_factory(self, file_extension: str) -> IngestorFactory:
32+
"""get_factory to match factory based on file extension"""
2533
return self.ingestor_factories.get(
2634
file_extension.lower(), UnsupportedIngestor("Unsupported file extension")
2735
)
2836

2937
async def get_ingestor(self, file_extension: str) -> Optional[BaseIngestor]:
38+
"""get_ingestor to get factory for that extension"""
3039
factory = self.get_factory(file_extension)
3140
return factory.create(file_extension)
3241

3342
async def supports(self, file_extension: str) -> bool:
43+
"""check if extension supports factory"""
3444
factory = self.get_factory(file_extension)
3545
return factory.supports(file_extension)
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from typing import List, AsyncGenerator
2+
from querent.common.types.collected_bytes import CollectedBytes
3+
from querent.ingestors.base_ingestor import BaseIngestor
4+
from querent.ingestors.ingestor_factory import IngestorFactory
5+
from querent.processors.async_processor import AsyncProcessor
6+
from querent.config.ingestor_config import IngestorBackend
7+
8+
9+
class TextIngestorFactory(IngestorFactory):
10+
SUPPORTED_EXTENSIONS = {"txt"}
11+
12+
async def supports(self, file_extension: str) -> bool:
13+
return file_extension.lower() in self.SUPPORTED_EXTENSIONS
14+
15+
async def create(
16+
self, file_extension: str, processors: List[AsyncProcessor]
17+
) -> BaseIngestor:
18+
if not self.supports(file_extension):
19+
return None
20+
21+
return TextIngestor(processors)
22+
23+
24+
class TextIngestor(BaseIngestor):
25+
def __init__(self, processors: List[AsyncProcessor]):
26+
super().__init__(IngestorBackend.TEXT)
27+
self.processors = processors
28+
29+
async def ingest(
30+
self, poll_function: AsyncGenerator[CollectedBytes, None]
31+
) -> AsyncGenerator[List[str], None]:
32+
try:
33+
collected_bytes = b""
34+
current_file = None
35+
36+
async for chunk_bytes in poll_function:
37+
if chunk_bytes.is_error():
38+
continue
39+
40+
if chunk_bytes.file != current_file:
41+
if current_file:
42+
text = await self.extract_and_process_text(
43+
CollectedBytes(file=current_file, data=collected_bytes)
44+
)
45+
yield text
46+
47+
collected_bytes = b""
48+
current_file = chunk_bytes.file
49+
50+
collected_bytes += chunk_bytes.data
51+
52+
if current_file:
53+
text = await self.extract_and_process_text(
54+
CollectedBytes(file=current_file, data=collected_bytes)
55+
)
56+
yield text
57+
58+
except Exception as e:
59+
print(e)
60+
yield []
61+
62+
async def extract_and_process_text(
63+
self, collected_bytes: CollectedBytes
64+
) -> List[str]:
65+
text = await self.extract_text_from_file(collected_bytes)
66+
return await self.process_data(text=text)
67+
68+
async def extract_text_from_file(self, collected_bytes: CollectedBytes) -> str:
69+
text = collected_bytes.data.decode("utf-8")
70+
return text
71+
72+
async def process_data(self, text: str) -> List[str]:
73+
processed_data = text
74+
for processor in self.processors:
75+
processed_data = await processor.process(processed_data)
76+
return processed_data

tests/data/text/asyncgenerator.txt

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Asynchronous generator functions are part of Python version 3.6, they were introduced by PEP-525. Asynchronous generator
2+
functions are much like regular asynchronous functions except that they contain the yield keyword in the function body.
3+
Which in turn, makes them much like regular generators, except for that you can use the await keyword in there as well.
4+
5+
When calling an asynchronous generator function, the result that is returned is an asynchronous generator object. In
6+
contrast to calling regular asynchronous functions which return a coroutine object.
7+
Since the asynchronous generator is, no surprise, asynchronous you are allowed to use the await keyword inside the
8+
asynchronous generator.
9+
10+
You can use this, for example, to send out HTTP requests in the asynchronous generator and yielding the response.
11+
12+
Besides asynchronous iterables you can use asynchronous generators with the async for-loop as well.

tests/test_audio_ingestor.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
import asyncio
1+
"""Test cases for audio ingestors"""
22
from pathlib import Path
3+
import pytest
4+
import asyncio
5+
36
from querent.collectors.fs.fs_collector import FSCollectorFactory
47
from querent.config.collector_config import FSCollectorConfig
58
from querent.common.uri import Uri
69
from querent.ingestors.ingestor_manager import IngestorFactoryManager
7-
import pytest
810

911

1012
@pytest.mark.asyncio
@@ -30,7 +32,7 @@ async def poll_and_print():
3032
if len(ingested) == 0:
3133
counter += 1
3234

33-
assert counter == 0
35+
assert counter == 0
3436

3537
await poll_and_print()
3638

tests/test_pdf_ingestor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async def poll_and_print():
3232
assert ingested is not None
3333
if ingested is not "" or ingested is not None:
3434
counter += 1
35-
assert counter == 19 # 19 pages in the PDF
35+
assert counter == 19 # 19 pages in the PDF
3636

3737
await poll_and_print() # Notice the use of await here
3838

tests/test_text_ingestor.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
from pathlib import Path
3+
from querent.collectors.fs.fs_collector import FSCollectorFactory
4+
from querent.config.collector_config import FSCollectorConfig
5+
from querent.common.uri import Uri
6+
from querent.ingestors.ingestor_manager import IngestorFactoryManager
7+
import pytest
8+
9+
10+
@pytest.mark.asyncio
11+
async def test_collect_and_ingest_txt():
12+
# Set up the collector
13+
collector_factory = FSCollectorFactory()
14+
uri = Uri("file://" + str(Path("./tests/data/text/").resolve()))
15+
config = FSCollectorConfig(root_path=uri.path)
16+
collector = collector_factory.resolve(uri, config)
17+
18+
# Set up the ingestor
19+
ingestor_factory_manager = IngestorFactoryManager()
20+
ingestor_factory = await ingestor_factory_manager.get_factory("txt")
21+
ingestor = await ingestor_factory.create("txt", [])
22+
23+
# Collect and ingest the PDF
24+
ingested_call = ingestor.ingest(collector.poll())
25+
counter = 0
26+
27+
async def poll_and_print():
28+
counter = 0
29+
async for ingested in ingested_call:
30+
assert ingested is not None
31+
if len(ingested) == 0:
32+
counter += 1
33+
assert counter == 0
34+
35+
await poll_and_print() # Notice the use of await here
36+
37+
38+
if __name__ == "__main__":
39+
asyncio.run(test_collect_and_ingest_txt())

0 commit comments

Comments
 (0)