Skip to content

Commit

Permalink
Merge branch 'main' into issue-10550/streams-without-partition-router…
Browse files Browse the repository at this point in the history
…s-nor-cursor-to-concurrent
  • Loading branch information
maxi297 committed Nov 18, 2024
2 parents 56be6d6 + 72117aa commit a48323f
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 173 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ jobs:
cdk_extra: n/a
- connector: source-shopify
cdk_extra: n/a
- connector: source-chargebee
cdk_extra: n/a
# Currently not passing CI (unrelated)
# - connector: source-zendesk-support
# cdk_extra: n/a
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dist
.mypy_cache
.venv
.pytest_cache
.idea
**/__pycache__
57 changes: 33 additions & 24 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import json
import logging
import pkgutil
import re
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple
from typing import Any, Dict, Iterator, List, Mapping, Optional
from packaging.version import Version, InvalidVersion

import yaml
from airbyte_cdk.models import (
Expand Down Expand Up @@ -245,45 +245,54 @@ def _validate_source(self) -> None:
"Validation against json schema defined in declarative_component_schema.yaml schema failed"
) from e

cdk_version = metadata.version("airbyte_cdk")
cdk_major, cdk_minor, cdk_patch = self._get_version_parts(cdk_version, "airbyte-cdk")
manifest_version = self._source_config.get("version")
if manifest_version is None:
cdk_version_str = metadata.version("airbyte_cdk")
cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
manifest_version_str = self._source_config.get("version")
if manifest_version_str is None:
raise RuntimeError(
"Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
)
manifest_major, manifest_minor, manifest_patch = self._get_version_parts(
manifest_version, "manifest"
)
manifest_version = self._parse_version(manifest_version_str, "manifest")

if cdk_version.startswith("0.0.0"):
if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
# Skipping version compatibility check on unreleased dev branch
pass
elif cdk_major < manifest_major or (
cdk_major == manifest_major and cdk_minor < manifest_minor
elif (cdk_version.major, cdk_version.minor) < (
manifest_version.major,
manifest_version.minor,
):
raise ValidationError(
f"The manifest version {manifest_version} is greater than the airbyte-cdk package version ({cdk_version}). Your "
f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
f"manifest may contain features that are not in the current CDK version."
)
elif manifest_major == 0 and manifest_minor < 29:
elif (manifest_version.major, manifest_version.minor) < (0, 29):
raise ValidationError(
f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
f"language. The manifest version {manifest_version} is incompatible with the airbyte-cdk package version "
f"{cdk_version} which contains these breaking changes."
f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
f"{cdk_version!s} which contains these breaking changes."
)

@staticmethod
def _get_version_parts(version: str, version_type: str) -> Tuple[int, int, int]:
"""
Takes a semantic version represented as a string and splits it into a tuple of its major, minor, and patch versions.
def _parse_version(
version: str,
version_type: str,
) -> Version:
"""Takes a semantic version represented as a string and splits it into a tuple.
The fourth part (prerelease) is not returned in the tuple.
Returns:
Version: the parsed version object
"""
version_parts = re.split(r"\.", version)
if len(version_parts) != 3 or not all([part.isdigit() for part in version_parts]):
try:
parsed_version = Version(version)
except InvalidVersion as ex:
raise ValidationError(
f"The {version_type} version {version} specified is not a valid version format (ex. 1.2.3)"
)
return tuple(int(part) for part in version_parts) # type: ignore # We already verified there were 3 parts and they are all digits
f"The {version_type} version '{version}' is not a valid version format."
) from ex
else:
# No exception
return parsed_version

def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
Expand Down
88 changes: 69 additions & 19 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,25 @@
from airbyte_cdk.utils import is_cloud_environment
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unstructured.file_utils.filetype import (
EXT_TO_FILETYPE,
FILETYPE_TO_MIMETYPE,
STR_TO_FILETYPE,
FileType,
detect_filetype,
)
import nltk

unstructured_partition_pdf = None
unstructured_partition_docx = None
unstructured_partition_pptx = None

try:
nltk.data.find("tokenizers/punkt.zip")
nltk.data.find("tokenizers/punkt_tab.zip")
except LookupError:
nltk.download("punkt")
nltk.download("punkt_tab")


def optional_decode(contents: Union[str, bytes]) -> str:
if isinstance(contents, bytes):
Expand Down Expand Up @@ -108,9 +117,11 @@ async def infer_schema(
format = _extract_format(config)
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
filetype = self._get_filetype(file_handle, file)

if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
raise self._create_parse_error(file, self._get_file_type_error_message(filetype))
raise self._create_parse_error(
file,
self._get_file_type_error_message(filetype),
)

return {
"content": {
Expand Down Expand Up @@ -159,6 +170,10 @@ def parse_records(
logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
else:
raise e
except Exception as e:
exception_str = str(e)
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
raise e

def _read_file(
self,
Expand All @@ -176,20 +191,32 @@ def _read_file(
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

filetype = self._get_filetype(file_handle, remote_file)
filetype: FileType | None = self._get_filetype(file_handle, remote_file)

if filetype == FileType.MD or filetype == FileType.TXT:
if filetype is None or filetype not in self._supported_file_types():
raise self._create_parse_error(
remote_file,
self._get_file_type_error_message(filetype),
)
if filetype in {FileType.MD, FileType.TXT}:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
return decoded_content
if filetype not in self._supported_file_types():
raise self._create_parse_error(remote_file, self._get_file_type_error_message(filetype))
if format.processing.mode == "local":
return self._read_file_locally(file_handle, filetype, format.strategy, remote_file)
return self._read_file_locally(
file_handle,
filetype,
format.strategy,
remote_file,
)
elif format.processing.mode == "api":
try:
result: str = self._read_file_remotely_with_retries(
file_handle, format.processing, filetype, format.strategy, remote_file
file_handle,
format.processing,
filetype,
format.strategy,
remote_file,
)
except Exception as e:
# If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow.
Expand Down Expand Up @@ -336,7 +363,11 @@ def _read_file_locally(

return self._render_markdown([element.to_dict() for element in elements])

def _create_parse_error(self, remote_file: RemoteFile, message: str) -> RecordParseError:
def _create_parse_error(
self,
remote_file: RemoteFile,
message: str,
) -> RecordParseError:
return RecordParseError(
FileBasedSourceError.ERROR_PARSING_RECORD, filename=remote_file.uri, message=message
)
Expand All @@ -360,32 +391,51 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
# detect_filetype is either using the file name or file content
# if possible, try to leverage the file name to detect the file type
# if the file name is not available, use the file content
file_type = detect_filetype(
filename=remote_file.uri,
)
if file_type is not None and not file_type == FileType.UNK:
file_type: FileType | None = None
try:
file_type = detect_filetype(
filename=remote_file.uri,
)
except Exception:
# Path doesn't exist locally. Try something else...
pass

if file_type and file_type != FileType.UNK:
return file_type

type_based_on_content = detect_filetype(file=file)
file.seek(0) # detect_filetype is reading to read the file content, so we need to reset

# detect_filetype is reading to read the file content
file.seek(0)
if type_based_on_content and type_based_on_content != FileType.UNK:
return type_based_on_content

return type_based_on_content
extension = "." + remote_file.uri.split(".")[-1].lower()
if extension in EXT_TO_FILETYPE:
return EXT_TO_FILETYPE[extension]

return None

def _supported_file_types(self) -> List[Any]:
return [FileType.MD, FileType.PDF, FileType.DOCX, FileType.PPTX, FileType.TXT]

def _get_file_type_error_message(self, file_type: FileType) -> str:
def _get_file_type_error_message(
self,
file_type: FileType | None,
) -> str:
supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
return f"File type {file_type} is not supported. Supported file types are {supported_file_types}"
return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"

def _render_markdown(self, elements: List[Any]) -> str:
return "\n\n".join((self._convert_to_markdown(el) for el in elements))

def _convert_to_markdown(self, el: Dict[str, Any]) -> str:
if dpath.get(el, "type") == "Title":
heading_str = "#" * (dpath.get(el, "metadata/category_depth", default=1) or 1)
category_depth = dpath.get(el, "metadata/category_depth", default=1) or 1
if not isinstance(category_depth, int):
category_depth = (
int(category_depth) if isinstance(category_depth, (str, float)) else 1
)
heading_str = "#" * category_depth
return f"{heading_str} {dpath.get(el, 'text')}"
elif dpath.get(el, "type") == "ListItem":
return f"- {dpath.get(el, 'text')}"
Expand Down
Loading

0 comments on commit a48323f

Please sign in to comment.