Skip to content

Commit 6a5ec5d

Browse files
Merge pull request #15 from Querent-ai/collector_interfaces
add some collector interfaces
2 parents 337a703 + 42ad378 commit 6a5ec5d

18 files changed

+331
-62
lines changed
File renamed without changes.

querent/collectors/collector_base.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from abc import ABC, abstractmethod
2+
from typing import AsyncGenerator
3+
4+
from querent.collectors.collector_result import CollectorResult
5+
6+
class Collector(ABC):
7+
@abstractmethod
8+
async def connect(self):
9+
pass
10+
11+
@abstractmethod
12+
async def poll(self) -> AsyncGenerator[CollectorResult, None]:
13+
pass
14+
15+
@abstractmethod
16+
async def disconnect(self):
17+
pass
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import List, Dict
2+
from pathlib import Path
3+
from enum import Enum
4+
5+
class CollectorErrorKind(Enum):
6+
NotFound = "not_found"
7+
Unauthorized = "unauthorized"
8+
incompatible = "incompatible"
9+
NotSupported = "not_supported"
10+
11+
class CollectorResolverError(Exception):
12+
def __init__(self, kind: CollectorErrorKind, message: str):
13+
super().__init__(message)
14+
self.kind = kind
15+
16+
17+
class CollectorError(Exception):
18+
def __init__(self, kind: CollectorErrorKind, source: any):
19+
self.kind = kind
20+
self.source = source
21+
22+
23+
class NotFoundError(CollectorError):
24+
def __init__(self, message: str):
25+
super().__init__(CollectorErrorKind.NotFound, message)
26+
27+
class UnauthorizedError(CollectorError):
28+
def __init__(self, message: str):
29+
super().__init__(CollectorErrorKind.unauthorized, message)
30+
31+
32+
class IncompatibleError(CollectorError):
33+
def __init__(self, message: str):
34+
super().__init__(CollectorErrorKind.incompatible, message)
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Optional
3+
from enum import Enum
4+
from querent.collectors.collector_base import Collector
5+
from querent.collectors.collector_errors import CollectorResolverError, CollectorErrorKind
6+
from querent.config.collector_config import CollectorBackend
7+
8+
9+
class CollectorFactory(ABC):
10+
@abstractmethod
11+
def backend(self) -> CollectorBackend:
12+
pass
13+
14+
@abstractmethod
15+
async def resolve(self, uri: str) -> Optional[CollectorBackend]:
16+
pass
17+
18+
19+
class UnsupportedCollector(CollectorFactory):
20+
def __init__(self, backend: CollectorBackend, message: str):
21+
self.backend = backend
22+
self.message = message
23+
24+
async def resolve(self, uri: str) -> Optional[Collector]:
25+
raise CollectorResolverError(
26+
CollectorErrorKind.NotSupported, self.backend, self.message
27+
)
+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import Optional
2+
from querent.collectors.fs.fs_collector import FSCollectorFactory
3+
from querent.config.collector_config import CollectorBackend
4+
from querent.collectors.collector_base import Collector
5+
from querent.collectors.collector_errors import CollectorResolverError, CollectorErrorKind
6+
from querent.common.uri import Protocol, Uri
7+
8+
class CollectorResolver:
9+
def __init__(self):
10+
self.collector_factories = {
11+
CollectorBackend.LocalFile: FSCollectorFactory(),
12+
# Add other collector factories as needed
13+
}
14+
15+
def resolve(self, uri: Uri) -> Optional[Collector]:
16+
backend = self._determine_backend(uri.protocol)
17+
18+
if backend in self.collector_factories:
19+
factory = self.collector_factories[backend]
20+
return factory.resolve(uri)
21+
else:
22+
raise CollectorResolverError(
23+
CollectorErrorKind.NotSupported, backend, "Unsupported backend"
24+
)
25+
26+
def _determine_backend(self, protocol: Protocol) -> CollectorBackend:
27+
if protocol.is_file_storage():
28+
return CollectorBackend.LocalFile
29+
else:
30+
raise CollectorResolverError(
31+
CollectorErrorKind.NotSupported, "Unknown backend", "Unknown backend"
32+
)
+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from typing import Any, Union
2+
3+
class CollectorResult:
4+
def __init__(self, value: Any, error: str = None) -> None:
5+
self.value = value
6+
self.error = error
7+
8+
def __str__(self):
9+
if self.error:
10+
return f"Error: {self.error}"
11+
return f"Value: {self.value}"
12+
13+
def is_error(self) -> bool:
14+
return self.error is not None
15+
16+
@classmethod
17+
def success(cls, value: Any) -> "CollectorResult":
18+
return cls(value)
19+
20+
@classmethod
21+
def error(cls, error: str) -> "CollectorResult":
22+
return cls(None, error)
23+
24+
def unwrap(self) -> Any:
25+
if self.error:
26+
raise ValueError(self.error)
27+
return self.value
28+
29+
def unwrap_or(self, default: Any) -> Any:
30+
return self.value if not self.error else default
31+
32+
def __eq__(self, other: Union[Any, "CollectorResult"]) -> bool:
33+
if isinstance(other, CollectorResult):
34+
return self.value == other.value and self.error == other.error
35+
return self.value == other
36+
37+
def __hash__(self) -> int:
38+
return hash((self.value, self.error))
39+
40+
41+
# Usage example
42+
if __name__ == "__main__":
43+
success_result = CollectorResult.success("Success value")
44+
print(success_result)
45+
46+
error_result = CollectorResult.error("An error occurred")
47+
print(error_result)
48+
49+
unwrapped_value = success_result.unwrap()
50+
print("Unwrapped:", unwrapped_value)
51+
52+
default_value = success_result.unwrap_or("Default value")
53+
print("Unwrapped with default:", default_value)
54+
55+
print("Equality:", success_result == "Success value")
56+
print("Equality:", success_result == CollectorResult.success("Success value"))
File renamed without changes.

querent/collectors/fs/fs_collector.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import asyncio
2+
from pathlib import Path
3+
from typing import AsyncGenerator
4+
from querent.collectors.collector_base import Collector
5+
from querent.collectors.collector_factory import CollectorFactory
6+
from querent.collectors.collector_result import CollectorResult
7+
from querent.common.uri import Uri
8+
from querent.config.collector_config import CollectorBackend, FSCollectorConfig
9+
import aiofiles
10+
11+
12+
class FSCollector(Collector):
13+
def __init__(self, config: FSCollectorConfig):
14+
self.root_dir = Path(config.root_path)
15+
self.chunk_size = config.chunk_size
16+
17+
async def connect(self):
18+
# Add your setup logic here if needed
19+
pass
20+
21+
async def disconnect(self):
22+
# Add your cleanup logic here if needed
23+
pass
24+
25+
async def poll(self) -> AsyncGenerator[CollectorResult, None]:
26+
async for file_path in self.walk_files(self.root_dir):
27+
async with aiofiles.open(file_path, "rb") as file:
28+
async for chunk in self.read_chunks(file):
29+
yield CollectorResult({"file_path": file_path, "chunk": chunk})
30+
31+
async def read_chunks(self, file):
32+
while True:
33+
chunk = await file.read(self.chunk_size)
34+
if not chunk:
35+
break
36+
yield chunk
37+
await file.close()
38+
39+
async def walk_files(self, root: Path) -> AsyncGenerator[Path, None]:
40+
for item in root.iterdir():
41+
if item.is_file():
42+
yield item
43+
44+
45+
class FSCollectorFactory(CollectorFactory):
46+
def __init__(self):
47+
pass
48+
49+
def backend(self) -> CollectorBackend:
50+
return CollectorBackend.LocalFile
51+
52+
def resolve(self, uri: Uri) -> Collector:
53+
config = FSCollectorConfig(root_path=uri.path)
54+
return FSCollector(config)

querent/config/collector_config.py

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from enum import Enum
2+
from typing import Optional
3+
from pydantic import BaseModel
4+
5+
6+
class CollectorBackend(str, Enum):
7+
LocalFile = "localfile"
8+
S3 = "s3"
9+
Gcs = "gs"
10+
11+
12+
class CollectConfig(BaseModel):
13+
backend: CollectorBackend
14+
15+
class Config:
16+
use_enum_values = True
17+
18+
19+
class FSCollectorConfig(BaseModel):
20+
root_path: str
21+
chunk_size: int = 1024
22+
23+
24+
class S3CollectConfig(BaseModel):
25+
bucket: str
26+
region: str
27+
access_key: str
28+
secret_key: str
29+
30+
31+
class GcsCollectConfig(BaseModel):
32+
bucket: str
33+
region: str
34+
access_key: str
35+
secret_key: str
36+
37+
38+
class CollectConfigWrapper(BaseModel):
39+
backend: CollectorBackend
40+
config: Optional[BaseModel] = None
41+
42+
@classmethod
43+
def from_collect_config(cls, collect_config: CollectConfig):
44+
if collect_config.backend == CollectorBackend.LocalFile:
45+
return cls(
46+
backend=CollectorBackend.LocalFile, config=FSCollectorConfig()
47+
)
48+
elif collect_config.backend == CollectorBackend.S3:
49+
return cls(backend=CollectorBackend.S3, config=S3CollectConfig())
50+
elif collect_config.backend == CollectorBackend.Gcs:
51+
return cls(backend=CollectorBackend.Gcs, config=GcsCollectConfig())
52+
else:
53+
raise ValueError(f"Unsupported collector backend: {collect_config.backend}")

querent/config/connector_config.py

Whitespace-only changes.

querent/connectors/connecror.errors.py

-35
This file was deleted.

querent/connectors/connector_base.py

Whitespace-only changes.

querent/connectors/connector_factory.py

Whitespace-only changes.

querent/connectors/connector_resolver.py

-27
This file was deleted.

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,4 @@ html2text==2020.1.16
154154
duckduckgo-search==3.8.3
155155
google-generativeai==0.1.0
156156
asyncio==3.4.3
157+
aiofiles

0 commit comments

Comments
 (0)