Skip to content

Commit

Permalink
Merge branch 'lazebnyi/add-dynamic-schema-loader' into daryna/test-dy…
Browse files Browse the repository at this point in the history
…namic-streams-changes
  • Loading branch information
darynaishchenko committed Dec 5, 2024
2 parents 890eec1 + 05e4f74 commit 5a42564
Show file tree
Hide file tree
Showing 63 changed files with 2,551 additions and 995 deletions.
46 changes: 25 additions & 21 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,34 @@ jobs:
cdk_extra: n/a
- connector: source-chargebee
cdk_extra: n/a
# Currently not passing CI (unrelated)
# - connector: source-zendesk-support
# cdk_extra: n/a
- connector: source-s3
cdk_extra: file-based
- connector: destination-pinecone
cdk_extra: vector-db-based
- connector: destination-motherduck
cdk_extra: sql
# ZenDesk currently failing (as of 2024-12-02)
# TODO: Re-enable once fixed
# - connector: source-zendesk-support
# cdk_extra: n/a
# TODO: These are manifest connectors and won't work as expected until we
# add `--use-local-cdk` support for manifest connectors.
- connector: source-the-guardian-api
cdk_extra: n/a
- connector: source-pokeapi
cdk_extra: n/a
# - connector: source-the-guardian-api
# cdk_extra: n/a
# - connector: source-pokeapi
# cdk_extra: n/a

name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})"
steps:
- name: Abort if extra not changed (${{matrix.cdk_extra}})
id: no_changes
if: ${{ matrix.cdk_extra != 'n/a' && needs.cdk_changes.outputs[matrix.cdk_extra] == 'false' }}
if: ${{ needs.cdk_changes.outputs['src'] == 'false' || matrix.cdk_extra != 'n/a' && needs.cdk_changes.outputs[matrix.cdk_extra] == 'false' }}
run: |
echo "Aborting job as specified extra not changed: ${{matrix.cdk_extra}} = ${{ needs.cdk_changes.outputs[matrix.cdk_extra] }}"
echo "Aborting job."
echo "Source code changed: ${{ needs.cdk_changes.outputs['src'] }}"
if [ "${{ matrix.cdk_extra }}" != "n/a" ]; then
echo "Extra not changed: ${{ matrix.cdk_extra }} = ${{ needs.cdk_changes.outputs[matrix.cdk_extra] }}"
fi
echo "> Skipped '${{matrix.connector}}' (no relevant changes)" >> $GITHUB_STEP_SUMMARY
echo "status=cancelled" >> $GITHUB_OUTPUT
exit 0
Expand All @@ -112,8 +117,7 @@ jobs:
if: steps.no_changes.outputs.status != 'cancelled'
with:
repository: airbytehq/airbyte
# TODO: Revert to `master` after Airbyte CI released:
ref: aj/airbyte-ci/update-python-local-cdk-code
ref: master
path: airbyte
- name: Test Connector
if: steps.no_changes.outputs.status != 'cancelled'
Expand All @@ -133,16 +137,7 @@ jobs:
--skip-step qa_checks \
--skip-step connector_live_tests
# Upload the job output to the artifacts
- name: Upload Job Output
id: upload_job_output
if: always() && steps.no_changes.outputs.status != 'cancelled'
uses: actions/upload-artifact@v4
with:
name: ${{matrix.connector}}-job-output
path: airbyte/airbyte-ci/connectors/pipelines/pipeline_reports

- name: Evaluate Job Output
- name: Evaluate Test Output
if: always() && steps.no_changes.outputs.status != 'cancelled'
run: |
# save job output json file as ci step output
Expand All @@ -162,3 +157,12 @@ jobs:
echo "::error::Test failed for connector '${{ matrix.connector }}' on step '${failed_step}'. Check the logs for more details."
exit 1
fi
# Upload the job output to the artifacts
- name: Upload Job Output
id: upload_job_output
if: always() && steps.no_changes.outputs.status != 'cancelled'
uses: actions/upload-artifact@v4
with:
name: ${{matrix.connector}}-job-output
path: airbyte/airbyte-ci/connectors/pipelines/pipeline_reports
25 changes: 17 additions & 8 deletions .github/workflows/pytest_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ on:
- 'poetry.lock'
- 'pyproject.toml'
pull_request:
paths:
- 'airbyte_cdk/**'
- 'unit_tests/**'
- 'poetry.lock'
- 'pyproject.toml'

jobs:
pytest:
Expand Down Expand Up @@ -52,21 +47,35 @@ jobs:
# Common steps:
- name: Checkout code
uses: actions/checkout@v4
- id: changes
uses: dorny/[email protected]
with:
filters: |
src:
- 'airbyte_cdk/**'
- 'unit_tests/**'
- 'bin/**'
- 'poetry.lock'
- 'pyproject.toml'
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
if: steps.changes.outputs.src == 'true'
with:
poetry-version: "1.7.1"
- name: Set up Python
uses: actions/setup-python@v5
if: steps.changes.outputs.src == 'true'
with:
python-version: ${{ matrix.python-version }}
cache: "poetry"
- name: Install dependencies
if: steps.changes.outputs.src == 'true'
run: poetry install --all-extras

# Job-specific step(s):
- name: Run Pytest
timeout-minutes: 60
if: steps.changes.outputs.src == 'true'
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: >
Expand All @@ -75,17 +84,17 @@ jobs:
-m "not linting and not super_slow and not flaky"
- name: Print Coverage Report
if: always()
if: always() && steps.changes.outputs.src == 'true'
run: poetry run coverage report

- name: Create Coverage Artifacts
if: always()
if: always() && steps.changes.outputs.src == 'true'
run: |
poetry run coverage html -d htmlcov
poetry run coverage xml -o htmlcov/coverage.xml
- name: Upload coverage to GitHub Artifacts
if: always()
if: always() && steps.changes.outputs.src == 'true'
uses: actions/upload-artifact@v4
with:
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-coverage
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dist
.venv
.pytest_cache
.idea
.vscode
**/__pycache__
17 changes: 12 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ COPY dist/*.whl ./dist/
RUN poetry config virtualenvs.create false \
&& poetry install --only main --no-interaction --no-ansi || true

# Copy source code
COPY airbyte_cdk ./airbyte_cdk

# Build and install the package
RUN pip install dist/*.whl

# Recreate the original structure
RUN mkdir -p source_declarative_manifest \
&& echo 'from source_declarative_manifest.run import run\n\nif __name__ == "__main__":\n run()' > main.py \
&& touch source_declarative_manifest/__init__.py \
&& cp /usr/local/lib/python3.10/site-packages/airbyte_cdk/cli/source_declarative_manifest/_run.py source_declarative_manifest/run.py \
&& cp /usr/local/lib/python3.10/site-packages/airbyte_cdk/cli/source_declarative_manifest/spec.json source_declarative_manifest/

# Remove unnecessary build files
RUN rm -rf dist/ pyproject.toml poetry.lock README.md

# Set the entrypoint
ENV AIRBYTE_ENTRYPOINT="source-declarative-manifest"
ENTRYPOINT ["source-declarative-manifest"]
ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2 changes: 2 additions & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
FailureType,
Level,
OAuthConfigSpecification,
OauthConnectorInputSpecification,
OrchestratorType,
State,
Status,
StreamDescriptor,
SyncMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
from airbyte_cdk.sources.types import Record
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -147,11 +147,11 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
# AbstractStreams are expected to return data as they are expected.
# Any transformation on the data should be done before reaching this point
message = stream_data_to_airbyte_message(
stream_name=record.partition.stream_name(),
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
)
stream = self._stream_name_to_instance[record.partition.stream_name()]
stream = self._stream_name_to_instance[record.stream_name]

if message.type == MessageType.RECORD:
if self._record_counter[stream.name] == 0:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import (
PartitionCompleteSentinel,
QueueItem,
)
from airbyte_cdk.sources.types import Record
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger


Expand Down
Loading

0 comments on commit 5a42564

Please sign in to comment.