Skip to content

Commit 2da8fc5

Browse files
authored
Added csv ingestor (#43)
1 parent 7b73554 commit 2da8fc5

File tree

5 files changed

+135
-0
lines changed

5 files changed

+135
-0
lines changed

querent/ingestors/csv/csv_ingestor.py

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""CSV Ingestor"""
2+
from typing import List, AsyncGenerator
3+
import csv
4+
import io
5+
6+
from querent.processors.async_processor import AsyncProcessor
7+
from querent.ingestors.ingestor_factory import IngestorFactory
8+
from querent.ingestors.base_ingestor import BaseIngestor
9+
from querent.config.ingestor_config import IngestorBackend
10+
from querent.common.types.collected_bytes import CollectedBytes
11+
12+
13+
class CsvIngestorFactory(IngestorFactory):
14+
"""Ingestor factory for CSV"""
15+
16+
SUPPORTED_EXTENSIONS = {"csv"}
17+
18+
async def supports(self, file_extension: str) -> bool:
19+
return file_extension.lower() in self.SUPPORTED_EXTENSIONS
20+
21+
async def create(
22+
self, file_extension: str, processors: List[AsyncProcessor]
23+
) -> BaseIngestor:
24+
if not self.supports(file_extension):
25+
return None
26+
return CsvIngestor(processors)
27+
28+
29+
class CsvIngestor(BaseIngestor):
30+
"""Ingestor for CSV"""
31+
32+
def __init__(self, processors: List[AsyncProcessor]):
33+
super().__init__(IngestorBackend.CSV)
34+
self.processors = processors
35+
36+
async def ingest(
37+
self, poll_function: AsyncGenerator[CollectedBytes, None]
38+
) -> AsyncGenerator[str, None]:
39+
current_file = None
40+
collected_bytes = b""
41+
try:
42+
async for chunk_bytes in poll_function:
43+
if chunk_bytes.is_error():
44+
# TODO handle error
45+
continue
46+
if current_file is None:
47+
current_file = chunk_bytes.file
48+
elif current_file != chunk_bytes.file:
49+
# we have a new file, process the old one
50+
async for text in self.extract_and_process_csv(
51+
CollectedBytes(file=current_file, data=collected_bytes)
52+
):
53+
yield text
54+
collected_bytes = b""
55+
current_file = chunk_bytes.file
56+
collected_bytes += chunk_bytes.data
57+
except Exception as e:
58+
# TODO handle exception
59+
print(e)
60+
yield ""
61+
finally:
62+
# process the last file
63+
async for text in self.extract_and_process_csv(
64+
CollectedBytes(file=current_file, data=collected_bytes)
65+
):
66+
yield text
67+
68+
async def extract_and_process_csv(
69+
self, collected_bytes: CollectedBytes
70+
) -> AsyncGenerator[str, None]:
71+
text = await self.extract_text_from_csv(collected_bytes)
72+
# print(text)
73+
processed_text = await self.process_data(text)
74+
yield processed_text
75+
76+
async def extract_text_from_csv(
77+
self, collected_bytes: CollectedBytes
78+
) -> csv.reader:
79+
text_data = collected_bytes.data.decode("utf-8")
80+
print(text_data)
81+
text = csv.reader(io.StringIO(text_data))
82+
return text
83+
84+
async def process_data(self, text: str) -> List[str]:
85+
processed_data = text
86+
for processor in self.processors:
87+
processed_data = await processor.process(processed_data)
88+
return processed_data

querent/ingestors/ingestor_manager.py

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from querent.ingestors.audio.audio_ingestors import AudioIngestorFactory
1111
from querent.ingestors.json.json_ingestor import JsonIngestorFactory
1212
from querent.ingestors.images.image_ingestor import ImageIngestorFactory
13+
from querent.ingestors.csv.csv_ingestor import CsvIngestorFactory
1314

1415

1516
class IngestorFactoryManager:
@@ -24,6 +25,7 @@ def __init__(self):
2425
IngestorBackend.JSON.value: JsonIngestorFactory(),
2526
IngestorBackend.JPG.value: ImageIngestorFactory(),
2627
IngestorBackend.PNG.value: ImageIngestorFactory(),
28+
IngestorBackend.CSV.value: CsvIngestorFactory(),
2729
# Ingestor.TEXT.value: TextIngestor(),
2830
# Add more mappings as needed
2931
}

tests/data/csv/demo_data.csv

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Name,State,City
2+
Ansh,Punjab,Anandpur
3+
Ayush,Odisha,Cuttack

tests/data/csv/demo_data1.csv

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FirstName,LastName,Country
2+
John,Doe,Usa
3+
Leo,Messi,Argentina
4+
Cristiano,Ronaldo,Portugal

tests/test_csv_ingestor.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import asyncio
2+
import pytest
3+
from pathlib import Path
4+
5+
from querent.collectors.fs.fs_collector import FSCollectorFactory
6+
from querent.config.collector_config import FSCollectorConfig
7+
from querent.common.uri import Uri
8+
from querent.ingestors.ingestor_manager import IngestorFactoryManager
9+
10+
11+
@pytest.mark.asyncio
12+
async def test_collect_and_ingest_csv_data():
13+
collector_factory = FSCollectorFactory()
14+
uri = Uri("file://" + str(Path("./tests/data/csv/").resolve()))
15+
config = FSCollectorConfig(root_path=uri.path)
16+
collector = collector_factory.resolve(uri, config)
17+
18+
ingestor_factory_manager = IngestorFactoryManager()
19+
ingestor_factory = await ingestor_factory_manager.get_factory("csv")
20+
ingestor = await ingestor_factory.create("csv", [])
21+
22+
ingested_call = ingestor.ingest(collector.poll())
23+
counter = 0
24+
25+
async def poll_and_print():
26+
counter = 0
27+
async for ingested in ingested_call:
28+
assert ingested is not None
29+
# we have an iterable in ingested
30+
for row in ingested:
31+
counter += 1
32+
assert counter == 7
33+
34+
await poll_and_print()
35+
36+
37+
if __name__ == "__main__":
38+
asyncio.run(test_collect_and_ingest_csv_data())

0 commit comments

Comments
 (0)