Skip to content

Create a common type for ingested tokens #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions querent/common/types/ingested_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Union


class IngestedTokens:
def __init__(self, file: str, data: [str], error: str = None) -> None:
self.data = data
self.error = error
self.file = file
if self.file:
file = str(file)
self.extension = file.split(".")[-1]
self.file_id = file.split("/")[-1].split(".")[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically all Ingestors returning tokens /words will have to return this type for homogenization of incoming data from ingestors


def __str__(self):
if self.error:
return f"Error: {self.error}"
return f"Data: {self.data}"

def is_error(self) -> bool:
return self.error is not None

def get_file_path(self) -> str:
return self.file

def get_extension(self) -> str:
return self.extension

def get_file_id(self) -> str:
return self.file_id

@classmethod
def success(cls, data: bytes) -> "IngestedTokens":
return cls(data)

@classmethod
def error(cls, error: str) -> "IngestedTokens":
return cls(None, error)

def unwrap(self) -> bytes:
if self.error:
raise ValueError(self.error)
return self.data

def unwrap_or(self, default: bytes) -> bytes:
return self.data if not self.error else default

def __eq__(self, other: Union[bytes, "IngestedTokens"]) -> bool:
if isinstance(other, IngestedTokens):
return self.data == other.data and self.error == other.error
return self.data == other

def __hash__(self) -> int:
return hash((self.data, self.error))
17 changes: 12 additions & 5 deletions querent/ingestors/base_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from typing import List
from abc import abstractmethod
from typing import AsyncGenerator, List
from querent.common.types.collected_bytes import CollectedBytes
from querent.common.types.ingested_tokens import IngestedTokens
from querent.processors.async_processor import AsyncProcessor


class BaseIngestor:
def __init__(self, processors: List[AsyncProcessor]):
self.processors = processors

async def process_data(self, text):
# Your common data processing logic here
@abstractmethod
async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[IngestedTokens, None]:
# Your common ingestion logic here
pass

async def extract_text_from_file(self, file_path: str) -> str:
# Your common file extraction logic here
@abstractmethod
async def process_data(self, text):
# Your common data processing logic here
pass
13 changes: 10 additions & 3 deletions querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import AsyncGenerator, List
import fitz # PyMuPDF
from querent.common.types.collected_bytes import CollectedBytes
from querent.common.types.ingested_tokens import IngestedTokens
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory
Expand Down Expand Up @@ -28,7 +29,7 @@ def __init__(self, processors: List[AsyncProcessor]):

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[str, None]:
) -> AsyncGenerator[IngestedTokens, None]:
current_file = None
collected_bytes = b""
try:
Expand Down Expand Up @@ -61,12 +62,18 @@ async def ingest(

async def extract_and_process_pdf(
self, collected_bytes: CollectedBytes
) -> AsyncGenerator[str, None]:
) -> AsyncGenerator[IngestedTokens, None]:
pdf = fitz.open(stream=collected_bytes.data, filetype="pdf")
for page in pdf:
text = page.get_text()
if not text:
continue
processed_text = await self.process_data(text)
yield processed_text
yield IngestedTokens(
file=collected_bytes.file,
data=processed_text,
error=collected_bytes.error,
)

async def process_data(self, text: str) -> List[str]:
processed_data = text
Expand Down