Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling #71

Merged
merged 9 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions querent/collectors/aws/aws_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
file = self.download_object_as_byte_stream(obj["Key"])
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=obj["Key"], data=chunk, error=None)
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
print(f"An error occurred: {e}")

except PermissionError as exc:
print(f"Unable to open this file {file}, getting error as {exc}")
except OSError as exc:
print(f"Getting OS Error on file {file}, as {exc}")
finally:
await self.disconnect() # Disconnect when done

Expand Down
16 changes: 13 additions & 3 deletions querent/collectors/fs/fs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from querent.common.uri import Uri
from querent.config.collector_config import CollectorBackend, FSCollectorConfig
import aiofiles
from querent.common import common_errors


class FSCollector(Collector):
Expand All @@ -24,9 +25,18 @@ async def disconnect(self):

async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
async for file_path in self.walk_files(self.root_dir):
async with aiofiles.open(file_path, "rb") as file:
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=file_path, data=chunk, error=None)
try:
async with aiofiles.open(file_path, "rb") as file:
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=file_path, data=chunk, error=None)
except PermissionError as exc:
raise common_errors.PermissionError(
f"Unable to open this file {file_path}, getting error as {exc}"
) from exc
except OSError as exc:
raise common_errors.OSError(
f"Getting OS Error on file {file_path}, as {exc}"
) from exc

async def read_chunks(self, file):
while True:
Expand Down
23 changes: 19 additions & 4 deletions querent/collectors/gcs/gcs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from querent.collectors.collector_base import Collector
from querent.collectors.collector_factory import CollectorFactory
from querent.common.uri import Uri
from querent.common import common_errors
from google.cloud import storage
from dotenv import load_dotenv

Expand All @@ -23,7 +24,12 @@ def __init__(self, config: GcsCollectConfig):

async def connect(self):
if not self.client:
self.client = storage.Client.from_service_account_info(self.credentials)
try:
self.client = storage.Client.from_service_account_info(self.credentials)
except ConnectionError as exc:
raise common_errors.ConnectionError(
"Please pass the credentials file"
) from exc

async def disconnect(self):
if self.client is not None:
Expand Down Expand Up @@ -58,9 +64,18 @@ async def stream_blob(self, blob):
if not chunk:
break
yield chunk
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
print(f"An error occurred while streaming blob: {e}")
except PermissionError as exc:
raise common_errors.PermissionError(
f"Unable to open this file {blob_file}, getting error as {exc}"
) from exc
except OSError as exc:
raise common_errors.OSError(
f"Getting OS Error on file {blob_file}, as {exc}"
) from exc
except Exception as exc:
raise common_errors.UnknownError(
f"Getting OS Error on file {blob_file}, as {exc}"
) from exc


class GCSCollectorFactory(CollectorFactory):
Expand Down
148 changes: 148 additions & 0 deletions querent/common/common_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from enum import Enum


class IngestorError(Enum):
EOF = "End of File"
ETCD = "ETCD Error"
NETWORK = "Network Error"
TIMEOUT = "Timeout"
UNKNOWN = "Unknown Error"
FILE_NOT_FOUND = "File Not Found"
IOERROR = "IOError"
UIE = "UnidentifiedImageError"
WRONGPPTFILE = "Wrong PPt file"
INVALIDXMLERROR = "Invalid Xml Error"
BADZIPFILE = "BadZipFile"
UNICODEDECODEERROR = "UnicodeDecodeError"
LOOKUPERROR = "LookupError"
TYPEERROR = "TypeError"
UNKNOWNVALUEERROR = "UnknownValueError"
REQUESTERROR = "RequestError"
INDEXERROR = "IndexError"
CSVERROR = "CsvError"
RUNTIMEERROR = "RuntimeError"
JSONDECODEERROR = "JsonDecodeError"
DOCUMENTERROR = "DocumentError"
SHELLERROR = "ShellError"
PERMISSIONERROR = "PermissionError"
OSERROR = "OSError"
CONNECTIONERROR = "ConnectionError"


class IngestorErrorBase(Exception):
def __init__(self, error_code, message=None) -> None:
super().__init__(message)
self.error_code = error_code


class FileNotFoundError(IngestorErrorBase):
"""Error function for file not found"""

def __init__(self, message=None):
super().__init__(message=message, error_code=IngestorError.FILE_NOT_FOUND)


class IOError(IngestorErrorBase):
"""Error function for I/O Errors"""

def __init__(self, message=None) -> None:
super().__init__(IngestorError.IOERROR, message)


class UnidentifiedImageError(IngestorErrorBase):
"""Error function for UnidentifiedImageError"""

def __init__(self, message=None) -> None:
super().__init__(IngestorError.UIE, message)


class WrongPptFileError(IngestorErrorBase):
"""Error function if ppt file is not being passed into ppt ingestor"""

def __init__(self, message=None) -> None:
super().__init__(IngestorError.WRONGPPTFILE, message)


class InvalidXmlError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.INVALIDXMLERROR, message)


class BadZipFile(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.BADZIPFILE, message)


class UnicodeDecodeError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.UNICODEDECODEERROR, message)


class LookupError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.LOOKUPERROR, message)


class TypeError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.TYPEERROR, message)


class UnknownValueError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.UNKNOWNVALUEERROR, message)


class RequestError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.REQUESTERROR, message)


class IndexErrorException(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.INDEXERROR, message)


class UnknownError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.UNKNOWN, message)


class CsvError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.CSVERROR, message)


class RuntimeError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.RUNTIMEERROR, message)


class JsonDecodeError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.JSONDECODEERROR, message)


class DocumentError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.DOCUMENTERROR, message)


class ShellError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.SHELLERROR, message)


class PermissionError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.PERMISSIONERROR, message)


class OSError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.OSERROR, message)


class ConnectionError(IngestorErrorBase):
def __init__(self, message=None) -> None:
super().__init__(IngestorError.CONNECTIONERROR, message)
61 changes: 38 additions & 23 deletions querent/ingestors/audio/audio_ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
from querent.ingestors.base_ingestor import BaseIngestor
from querent.config.ingestor_config import IngestorBackend
from querent.common.types.collected_bytes import CollectedBytes
from querent.common.common_errors import (
UnknownValueError,
RequestError,
IndexErrorException,
UnknownError,
)
from querent.common.types.ingested_tokens import IngestedTokens


Expand Down Expand Up @@ -63,32 +69,41 @@ async def ingest(
async def extract_and_process_audio(
self, collected_bytes: CollectedBytes
) -> AsyncGenerator[IngestedTokens, None]:
audio_segment = AudioSegment.from_file(
io.BytesIO(collected_bytes.data), format=collected_bytes.extension
)
temp_wave = io.BytesIO()
audio_segment.export(temp_wave, format="wav")
# Initialize the recognizer
recognizer = sr.Recognizer()

temp_wave.seek(0)
with sr.AudioFile(temp_wave) as source:
recognizer.adjust_for_ambient_noise(source, duration=1)
audio_data = recognizer.record(source)

# Recognize the text using the recognizer
try:
recognized_text = recognizer.recognize_google(audio_data, language="en-US")
processed_text = await self.process_data(recognized_text)
yield IngestedTokens(
file=collected_bytes.file, data=[processed_text], error=None
audio_segment = AudioSegment.from_file(
io.BytesIO(collected_bytes.data), format=collected_bytes.extension
)
except sr.UnknownValueError:
print("Could not understand the audio")
except sr.RequestError as e:
print(f"Could not request results; {e}")
except Exception as e:
print(e)
temp_wave = io.BytesIO()
audio_segment.export(temp_wave, format="wav")
# Initialize the recognizer
recognizer = sr.Recognizer()

temp_wave.seek(0)
with sr.AudioFile(temp_wave) as source:
recognizer.adjust_for_ambient_noise(source, duration=1)
audio_data = recognizer.record(source)

# Recognize the text using the recognizer

recognized_text = recognizer.recognize_google(audio_data, language="en-US")
yield recognized_text
except sr.UnknownValueError as exc:
raise UnknownValueError(
f"The following file gave Unknown Value Error {collected_bytes.file}"
) from exc
except sr.RequestError as exc:
raise RequestError(
f"The following file gave Request Error {collected_bytes.file}"
) from exc
except IndexError as exc:
raise IndexErrorException(
f"The following file gave Request Error {collected_bytes.file}"
) from exc
except Exception as exc:
raise UnknownError(
f"Received unknown error {exc} from the file {collected_bytes.file}"
) from exc

async def process_data(self, text: str) -> str:
processed_data = text
Expand Down
24 changes: 21 additions & 3 deletions querent/ingestors/csv/csv_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from querent.ingestors.base_ingestor import BaseIngestor
from querent.config.ingestor_config import IngestorBackend
from querent.common.types.collected_bytes import CollectedBytes
from querent.common import common_errors
from querent.common.types.ingested_tokens import IngestedTokens


Expand All @@ -19,7 +20,7 @@ async def supports(self, file_extension: str) -> bool:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
if not await self.supports(file_extension):
return None
return CsvIngestor(processors)

Expand Down Expand Up @@ -69,8 +70,25 @@ async def extract_and_process_csv(
yield processed_row

async def extract_text_from_csv(self, collected_bytes: CollectedBytes) -> str:
text_data = collected_bytes.data.decode("utf-8")
return text_data
try:
text_data = collected_bytes.data.decode("utf-8")
return text_data
except UnicodeDecodeError as exc:
raise common_errors.UnicodeDecodeError(
f"Getting UnicodeDecodeError on this file {collected_bytes.file}"
) from exc
except LookupError as exc:
raise common_errors.LookupError(
f"Getting LookupError on this file {collected_bytes.file}"
) from exc
except TypeError as exc:
raise common_errors.TypeError(
f"Getting TypeError on this file {collected_bytes.file}"
) from exc
except Exception as exc:
raise common_errors.UnknownError(
f"Getting error message:- {exc} on file {collected_bytes.file}"
)

async def process_data(self, text: str) -> str:
processed_data = text
Expand Down
Loading