Skip to content

Added csv ingestor #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions querent/ingestors/csv/csv_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""CSV Ingestor"""
from typing import List, AsyncGenerator
import csv
import io

from querent.processors.async_processor import AsyncProcessor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.ingestors.base_ingestor import BaseIngestor
from querent.config.ingestor_config import IngestorBackend
from querent.common.types.collected_bytes import CollectedBytes


class CsvIngestorFactory(IngestorFactory):
"""Ingestor factory for CSV"""

SUPPORTED_EXTENSIONS = {"csv"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
return None
return CsvIngestor(processors)


class CsvIngestor(BaseIngestor):
"""Ingestor for CSV"""

def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.CSV)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[str, None]:
current_file = None
collected_bytes = b""
try:
async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
# TODO handle error
continue
if current_file is None:
current_file = chunk_bytes.file
elif current_file != chunk_bytes.file:
# we have a new file, process the old one
async for text in self.extract_and_process_csv(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield text
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
# TODO handle exception
print(e)
yield ""
finally:
# process the last file
async for text in self.extract_and_process_csv(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield text

async def extract_and_process_csv(
self, collected_bytes: CollectedBytes
) -> AsyncGenerator[str, None]:
text = await self.extract_text_from_csv(collected_bytes)
# print(text)
processed_text = await self.process_data(text)
yield processed_text

async def extract_text_from_csv(
self, collected_bytes: CollectedBytes
) -> csv.reader:
text_data = collected_bytes.data.decode("utf-8")
print(text_data)
text = csv.reader(io.StringIO(text_data))
return text

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
2 changes: 2 additions & 0 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from querent.ingestors.audio.audio_ingestors import AudioIngestorFactory
from querent.ingestors.json.json_ingestor import JsonIngestorFactory
from querent.ingestors.images.image_ingestor import ImageIngestorFactory
from querent.ingestors.csv.csv_ingestor import CsvIngestorFactory


class IngestorFactoryManager:
Expand All @@ -17,6 +18,7 @@ def __init__(self):
IngestorBackend.JSON.value: JsonIngestorFactory(),
IngestorBackend.JPG.value: ImageIngestorFactory(),
IngestorBackend.PNG.value: ImageIngestorFactory(),
IngestorBackend.CSV.value: CsvIngestorFactory(),
# Ingestor.TEXT.value: TextIngestor(),
# Add more mappings as needed
}
Expand Down
3 changes: 3 additions & 0 deletions tests/data/csv/demo_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Name,State,City
Ansh,Punjab,Anandpur
Ayush,Odisha,Cuttack
4 changes: 4 additions & 0 deletions tests/data/csv/demo_data1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FirstName,LastName,Country
John,Doe,Usa
Leo,Messi,Argentina
Cristiano,Ronaldo,Portugal
38 changes: 38 additions & 0 deletions tests/test_csv_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import pytest
from pathlib import Path

from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager


@pytest.mark.asyncio
async def test_collect_and_ingest_csv_data():
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/csv/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("csv")
ingestor = await ingestor_factory.create("csv", [])

ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
# we have an iterable in ingested
for row in ingested:
counter += 1
assert counter == 7

await poll_and_print()


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_csv_data())