Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Low-Code] Allow low-code streams using AsyncRetriever to be run within the Concurrent CDK #168

Closed
1 task
brianjlai opened this issue Dec 12, 2024 · 5 comments

Comments

@brianjlai
Copy link
Contributor

Context

After allowing certain types of low-code streams to be processed within the concurrent framework, we ran into an issue where stream that used the AsyncRetriever component would result in errors during processing. One such example is

{
  "type": "TRACE",
  "trace": {
    "type": "ERROR",
    "emitted_at": 1733873724427,
    "error": {
      "message": "Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
      "internal_message": "AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
      "stack_trace": "Traceback (most recent call last):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/streams/concurrent/partition_reader.py\", line 40, in process_partition\n    for record in partition.read():\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py\", line 59, in read\n    for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 119, in read_records\n    records: Iterable[Mapping[str, Any]] = self._job_orchestrator.fetch_records(partition)\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 60, in _job_orchestrator\n    raise AirbyteTracedException(\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: AsyncPartitionRepository is expected to be accessed only after `stream_slices`\n",
      "failure_type": "system_error",
      "stream_descriptor": {
        "name": "contacts"
      }
    }
  }
}

As a temporary fix, we changed the concurrent_declarative_source.py to only run streams using the SimpleRetriever concurrently.

Slack thread with more context:
https://airbytehq-team.slack.com/archives/C063B9A434H/p1733264441227669?thread_ts=1733257895.550789&cid=C063B9A434H

Problem / Solution

The root of the issue is two fold

Issue 1:

Within the concurrent_declarative_source.py, when we instantiate our StreamSlicerPartitionGenerator, we pass in the declarative_stream.retriever.stream_slicer. However, this will not work for an AsyncRetriever because in this scenario the stream_slicer corresponds to an underlying partition router of the AsyncRetriever which is used to supply partitions when creating async jobs. And in our current implementation, the AsyncRetriever is responsible for generating slices within stream_slices() instead of delegating to the stream_slicer unlike our existing SimpleRetriever.

For example in simple_retriever.py:

def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
    return self.stream_slicer.stream_slices()

In async_retriever.py:

def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
    slices = self.stream_slicer.stream_slices()
    self.__job_orchestrator = self._job_orchestrator_factory(slices)

    for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
        yield StreamSlice(
            partition=dict(completed_partition.stream_slice.partition)
            | {"partition": completed_partition},
            cursor_slice=completed_partition.stream_slice.cursor_slice,
        )

What we should do is:

  • Create a new StreamSlicer low-code component called AsyncJobStreamSlicer that adheres to the StreamSlicer interface
  • The implementation of stream_slices() should be the current implementation shown above. It should be instantiated with a parent stream slicer
  • AsyncRequester should have a stream slicer defined as a field and use it when stream_slices() is called
  • Within the model_to_component_factory, we instantiate the new async stream_slicer
  • There should be no impact to the low-code interface

Issue 2:

We are instantiating a new AsyncRetriever every time we create a new partition because the factory method we supply to the StreamSlicerPartitionGenerator instantiates new instances of the declarative stream + retriever. That in turn leads to the partition invoking AsyncRetriever.read_records() on a new instance of the async retriever which has not been instantiated properly. This is because the AsyncRetriever.stream_slices() is not stateless and responsible for setting it up properly to then be called during read records. Also, because we instantiate individual retrievers, they aren't using a shared AsyncJobOrchestrator or AsyncJobRepository which is needed to properly manage the internal state of the retriever.

As evidenced by this error:

{
  "type": "TRACE",
  "trace": {
    "type": "ERROR",
    "emitted_at": 1733873724427,
    "error": {
      "message": "Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
      "internal_message": "AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
      "stack_trace": "Traceback (most recent call last):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/streams/concurrent/partition_reader.py\", line 40, in process_partition\n    for record in partition.read():\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py\", line 59, in read\n    for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 119, in read_records\n    records: Iterable[Mapping[str, Any]] = self._job_orchestrator.fetch_records(partition)\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 60, in _job_orchestrator\n    raise AirbyteTracedException(\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: AsyncPartitionRepository is expected to be accessed only after `stream_slices`\n",
      "failure_type": "system_error",
      "stream_descriptor": {
        "name": "contacts"
      }
    }
  }
}

What we want to actually do is reuse the same AsyncRetriever on each partition. This would allow us to use a properly instantiated AsyncRetriever which has already called stream_slices(). From within concurrent_declarative_source.py, we can instead just pass the original AsyncRetriever instance which has the proper state as well as the shared orchestrators and job repository.

However, there is one major problem with this approach and that is that there are potential ways that the AsyncRetriever is not thread safe. The biggest one being that the DefaultPaginator relies on an internal state. The _token field is overwritten each time we read a page. If we have multiple partitions using the same AsyncRetriever, we can possibly lose records between partitions. If we make this thread safe, we should be able to share the same retriever.

Note:

There are potentially other places where we may not be thread safe. However, we can do some additional analysis about areas that are not. However, rather than drag work out and try to fix everything, we are going to make a calculated bet that the impact on async retriever + thread safety is relatively low blast radius. We may at some point have to revisit making all of our low-code components thread safe in the future.

Acceptance Criteria

  • [ ]
  • There should be no breaking changes to the low-code interface
@brianjlai
Copy link
Contributor Author

The first part of this was merged into the CDK: #170

The second part I'll put up for review in the next day or two as I clean up the code and finish fixing the crazy amount of tests this breaks.

@klsoper
Copy link

klsoper commented Jan 6, 2025

@brianjlai any update here?

@brianjlai
Copy link
Contributor Author

@klsoper yep!

I have the last PR to get our async streams to concurrent here: #185

It's been tested on some of our existing connectors, but we didn't want to release this during the holidays and want to wait for @maxi297 to get back to do a final sign off since the PR affects quite a few things. We're still on track.

@klsoper
Copy link

klsoper commented Jan 10, 2025

@brianjlai looks like this was merged, can we close this up?

@brianjlai
Copy link
Contributor Author

Yep it's merged in CDK version 6.17.0 if you want to communicate that back to your user. So as long as the user's custom connector is using that version or higher they should have the fix.

One not though, is I've had a few issues w/ our CI/CD pipeline in the past two days, but it should be released to most custom connectors. If this is for one of our connectors, then we'll need to check that the version is up to date. Some connectors aren't auto upgraded

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants