Skip to content

Commit b0259a2

Browse files
authored
Xlsx ingestor (#51)
* Added xlsx ingestor * conflicts resolve * removed unused import * added dependencies
1 parent 31f2c50 commit b0259a2

File tree

7 files changed

+129
-4
lines changed

7 files changed

+129
-4
lines changed

querent/ingestors/ingestor_manager.py

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from querent.ingestors.images.image_ingestor import ImageIngestorFactory
1313
from querent.ingestors.doc.doc_ingestor import DocIngestorFactory
1414
from querent.ingestors.csv.csv_ingestor import CsvIngestorFactory
15+
from querent.ingestors.xlsx.xlsx_ingestor import XlsxIngestorFactory
1516
from querent.ingestors.ppt.ppt_ingestor import PptIngestorFactory
1617

1718

@@ -30,6 +31,7 @@ def __init__(self):
3031
IngestorBackend.DOCX.value: DocIngestorFactory(),
3132
IngestorBackend.DOC.value: DocIngestorFactory(),
3233
IngestorBackend.CSV.value: CsvIngestorFactory(),
34+
IngestorBackend.XLSX.value: XlsxIngestorFactory(),
3335
IngestorBackend.PPT.value: PptIngestorFactory(),
3436
IngestorBackend.PPTX.value: PptIngestorFactory(),
3537
# Ingestor.TEXT.value: TextIngestor(),
+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Ingestor file for xlsx files"""
2+
from typing import List, AsyncGenerator
3+
import io
4+
import pandas as pd
5+
6+
from querent.ingestors.ingestor_factory import IngestorFactory
7+
from querent.ingestors.base_ingestor import BaseIngestor
8+
from querent.processors.async_processor import AsyncProcessor
9+
from querent.config.ingestor_config import IngestorBackend
10+
from querent.common.types.collected_bytes import CollectedBytes
11+
12+
13+
class XlsxIngestorFactory(IngestorFactory):
14+
"""Ingestor factory for xlsx files"""
15+
16+
SUPPORTED_EXTENSIONS = {"xlsx"}
17+
18+
async def supports(self, file_extension: str) -> bool:
19+
return file_extension.lower() in self.SUPPORTED_EXTENSIONS
20+
21+
async def create(
22+
self, file_extension: str, processors: List[AsyncProcessor]
23+
) -> BaseIngestor:
24+
if not await self.supports(file_extension):
25+
return None
26+
return XlsxIngestor(processors)
27+
28+
29+
class XlsxIngestor(BaseIngestor):
30+
"""Ingestor for xlsx files"""
31+
32+
def __init__(self, processors: List[AsyncProcessor]):
33+
super().__init__(IngestorBackend.XLSX)
34+
self.processors = processors
35+
36+
async def ingest(
37+
self, poll_function: AsyncGenerator[CollectedBytes, None]
38+
) -> AsyncGenerator[str, None]:
39+
current_file = None
40+
collected_bytes = b""
41+
try:
42+
async for chunk_bytes in poll_function:
43+
if chunk_bytes.is_error():
44+
# TODO handle error
45+
continue
46+
if current_file is None:
47+
current_file = chunk_bytes.file
48+
elif current_file != chunk_bytes.file:
49+
# we have a new file, process the old one
50+
async for frames in self.extract_and_process_xlsx(
51+
CollectedBytes(file=current_file, data=collected_bytes)
52+
):
53+
yield frames
54+
collected_bytes = b""
55+
current_file = chunk_bytes.file
56+
collected_bytes += chunk_bytes.data
57+
except Exception as e:
58+
# TODO handle exception
59+
yield ""
60+
finally:
61+
# process the last file
62+
async for frames in self.extract_and_process_xlsx(
63+
CollectedBytes(file=current_file, data=collected_bytes)
64+
):
65+
yield frames
66+
67+
async def extract_and_process_xlsx(
68+
self, collected_bytes: CollectedBytes
69+
) -> AsyncGenerator[str, None]:
70+
"""function to extract and process xlsx file bytes"""
71+
df = await self.extract_text_from_xlsx(collected_bytes)
72+
yield df
73+
74+
async def extract_text_from_xlsx(
75+
self, collected_bytes: CollectedBytes
76+
) -> pd.DataFrame:
77+
"""function to extract all the rows in the file"""
78+
excel_buffer = io.BytesIO(collected_bytes.data)
79+
dataframe = pd.read_excel(excel_buffer)
80+
return dataframe
81+
82+
async def process_data(self, text: str) -> List[str]:
83+
processed_data = text
84+
for processor in self.processors:
85+
processed_data = await processor.process(processed_data)
86+
return processed_data

requirements.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -161,5 +161,7 @@ SpeechRecognition
161161
pytesseract
162162
pillow
163163
pytextract
164+
pandas
164165
python-pptx
165-
tika
166+
tika
167+
openpyxl

tests/data/xlsx/book1.xlsx

5.13 KB
Binary file not shown.

tests/test_json_ingestor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ async def poll_and_print():
2525
counter = 0
2626
async for ingested in ingested_call:
2727
assert ingested is not None
28-
if len(ingested) == 0:
28+
if len(ingested) != 0:
2929
counter += 1
30-
assert counter == 0
30+
assert counter == 2
3131

3232
await poll_and_print()
3333

tests/test_webscrapper.py

-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,5 @@ def test_scrapping_data():
3232
async def poll_and_print():
3333
async for result in collector.poll():
3434
assert not result.is_error()
35-
print(result.unwrap())
3635

3736
asyncio.run(poll_and_print())

tests/test_xlsx_ingestor.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
from pathlib import Path
3+
from querent.collectors.fs.fs_collector import FSCollectorFactory
4+
from querent.config.collector_config import FSCollectorConfig
5+
from querent.common.uri import Uri
6+
from querent.ingestors.ingestor_manager import IngestorFactoryManager
7+
import pytest
8+
9+
10+
@pytest.mark.asyncio
11+
async def test_collect_and_ingest_xlsx():
12+
collector_factory = FSCollectorFactory()
13+
uri = Uri("file://" + str(Path("./tests/data/xlsx/").resolve()))
14+
config = FSCollectorConfig(root_path=uri.path)
15+
collector = collector_factory.resolve(uri, config)
16+
17+
ingestor_factory_manager = IngestorFactoryManager()
18+
ingestor_factory = await ingestor_factory_manager.get_factory("xlsx")
19+
ingestor = await ingestor_factory.create("xlsx", [])
20+
21+
ingested_call = ingestor.ingest(collector.poll())
22+
counter = 0
23+
24+
async def poll_and_print():
25+
counter = 0
26+
async for ingested in ingested_call:
27+
assert ingested is not None
28+
for i in range(0, ingested.shape[0]):
29+
counter += 1
30+
assert counter == 3
31+
32+
await poll_and_print()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(test_collect_and_ingest_xlsx())

0 commit comments

Comments
 (0)