Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Composite Raw Decoder #179

Merged
merged 27 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b7724d
Composite Decoder: add Composite Decoder
artem1205 Dec 17, 2024
990584e
Composite Decoder: add Models
artem1205 Dec 18, 2024
50782f7
Composite Decoder: ref to use BufferedIOBase
artem1205 Dec 19, 2024
17add9e
Composite Decoder: remove inner_parser from parser definition
artem1205 Dec 19, 2024
655ce35
Composite Decoder: ref models
artem1205 Dec 19, 2024
513aa43
Composite Decoder: clean
artem1205 Dec 19, 2024
eada5bf
Composite Decoder: ref todo
artem1205 Dec 19, 2024
1984ef1
Composite Decoder: remove args & kwargs
artem1205 Dec 19, 2024
8d3f82a
Composite Decoder: fmt mypy
artem1205 Dec 19, 2024
f2c9b0a
Composite Decoder: add to model factory
artem1205 Dec 23, 2024
961356c
Composite Decoder: add to model factory
artem1205 Dec 23, 2024
5fa937c
Composite Raw Decoder: add unittest for parsers
artem1205 Dec 23, 2024
1b85c26
Composite Raw Decoder: fix CompositeRawDecoder creation
artem1205 Dec 23, 2024
a181608
Composite Raw Decoder: ref: CompositeRawDecoder & jsonlinedecoder
artem1205 Dec 23, 2024
5276ed1
Composite Raw Decoder: fmt
artem1205 Dec 23, 2024
adf9d3d
Composite Raw Decoder: fix mypy
artem1205 Dec 23, 2024
7604b99
Composite Raw Decoder: fix mypy
artem1205 Dec 24, 2024
bbd5e3a
Merge remote-tracking branch 'origin/main' into artem1205/add-composi…
artem1205 Dec 24, 2024
f9a97db
CDK: fix conflicts
artem1205 Dec 24, 2024
7fd73ad
CDK: add type for JsonLineParser
artem1205 Dec 26, 2024
693a82d
CDK: rename
artem1205 Dec 26, 2024
ca8f31e
CDK: run prettier
artem1205 Dec 26, 2024
46e80d3
CDK: ref
artem1205 Dec 26, 2024
61a3919
CDK: ref to csv.DictReader
artem1205 Dec 26, 2024
2ff8edf
CDK: fix mypy
artem1205 Dec 26, 2024
9641d4d
CDK: apply coderabbit suggestions
artem1205 Dec 26, 2024
2b6cfb5
CDK: fix
artem1205 Dec 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name: Packaging and Publishing
on:
push:
tags:
- 'v*'
- "v*"
workflow_dispatch:
inputs:
version:
Expand Down
55 changes: 55 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2766,9 +2766,64 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
CompositeRawDecoder:
description: "(This is experimental, use at your own risk)"
type: object
required:
- type
- parser
properties:
type:
type: string
enum: [CompositeRawDecoder]
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
GzipParser:
type: object
required:
- type
- inner_parser
properties:
type:
type: string
enum: [GzipParser]
inner_parser:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
JsonLineParser:
type: object
required:
- type
properties:
type:
type: string
enum: [JsonLineParser]
encoding:
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
type: string
default: utf-8
CsvParser:
type: object
required:
- type
properties:
type:
type: string
enum: [CsvParser]
encoding:
type: string
default: utf-8
delimiter:
type: string
default: ","
AsyncJobStatusMap:
description: Matches the api job status to Async Job Status.
type: object
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
Expand All @@ -17,6 +18,7 @@

__all__ = [
"Decoder",
"CompositeRawDecoder",
"JsonDecoder",
"JsonlDecoder",
"IterableDecoder",
Expand Down
97 changes: 97 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import csv
import gzip
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional

import requests

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class Parser(ABC):
@abstractmethod
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse data and yield dictionaries.
"""
pass


@dataclass
class GzipParser(Parser):
inner_parser: Parser

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.
"""
gzipobj = gzip.GzipFile(fileobj=data, mode="rb")
yield from self.inner_parser.parse(gzipobj)


@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
for line in data:
try:
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
except json.JSONDecodeError:
logger.warning(f"Cannot decode/parse line {line!r} as JSON")


@dataclass
class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse CSV data from decompressed bytes.
"""
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
reader = csv.DictReader(text_data, delimiter=self.delimiter or ",")
yield from reader


@dataclass
class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.
Example:
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
"""

parser: Parser

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,17 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonLineParser(BaseModel):
type: Literal["JsonLineParser"]
encoding: Optional[str] = "utf-8"


class CsvParser(BaseModel):
type: Literal["CsvParser"]
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","


class AsyncJobStatusMap(BaseModel):
type: Optional[Literal["AsyncJobStatusMap"]] = None
running: List[str]
Expand Down Expand Up @@ -1208,6 +1219,8 @@ class ComponentMappingDefinition(BaseModel):
"{{ components_values['updates'] }}",
"{{ components_values['MetaData']['LastUpdatedTime'] }}",
"{{ config['segment_id'] }}",
"{{ stream_slice['parent_id'] }}",
"{{ stream_slice['extra_fields']['name'] }}",
],
title="Value",
)
Expand Down Expand Up @@ -1504,6 +1517,11 @@ class RecordSelector(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GzipParser(BaseModel):
type: Literal["GzipParser"]
inner_parser: Union[JsonLineParser, CsvParser]


class Spec(BaseModel):
type: Literal["Spec"]
connection_specification: Dict[str, Any] = Field(
Expand Down Expand Up @@ -1534,6 +1552,11 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CompositeRawDecoder(BaseModel):
type: Literal["CompositeRawDecoder"]
parser: Union[GzipParser, JsonLineParser, CsvParser]


class DeclarativeSource1(BaseModel):
class Config:
extra = Extra.forbid
Expand Down Expand Up @@ -1936,6 +1959,7 @@ class SimpleRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
]
] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
PaginationDecoderDecorator,
XmlDecoder,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
CsvParser,
GzipParser,
JsonLineParser,
)
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
RecordFilter,
Expand Down Expand Up @@ -125,6 +131,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CompositeErrorHandler as CompositeErrorHandlerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CompositeRawDecoder as CompositeRawDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
Expand All @@ -134,6 +143,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CsvParser as CsvParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CursorPagination as CursorPaginationModel,
)
Expand Down Expand Up @@ -203,6 +215,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipParser as GzipParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpComponentsResolver as HttpComponentsResolverModel,
)
Expand All @@ -227,6 +242,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonLineParser as JsonLineParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -455,6 +473,7 @@ def _init_mappings(self) -> None:
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
CompositeRawDecoderModel: self.create_composite_raw_decoder,
ConcurrencyLevelModel: self.create_concurrency_level,
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy,
CursorPaginationModel: self.create_cursor_pagination,
Expand Down Expand Up @@ -485,7 +504,9 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonLineParserModel: self.create_json_line_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
FlattenFieldsModel: self.create_flatten_fields,
Expand Down Expand Up @@ -1701,6 +1722,12 @@ def create_jsonl_decoder(
) -> JsonlDecoder:
return JsonlDecoder(parameters={})

@staticmethod
def create_json_line_parser(
model: JsonLineParserModel, config: Config, **kwargs: Any
) -> JsonLineParser:
return JsonLineParser(encoding=model.encoding)

@staticmethod
def create_iterable_decoder(
model: IterableDecoderModel, config: Config, **kwargs: Any
Expand All @@ -1717,6 +1744,22 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_gzip_parser(
self, model: GzipParserModel, config: Config, **kwargs: Any
) -> GzipParser:
inner_parser = self._create_component_from_model(model=model.inner_parser, config=config)
return GzipParser(inner_parser=inner_parser)

@staticmethod
def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser:
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)

def create_composite_raw_decoder(
self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any
) -> CompositeRawDecoder:
parser = self._create_component_from_model(model=model.parser, config=config)
return CompositeRawDecoder(parser=parser)

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
Loading
Loading