-
Notifications
You must be signed in to change notification settings - Fork 83
feat: add pipelines structure #1046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
daniel-sanche
merged 134 commits into
pipeline_queries_approved
from
pipeline_queries_1_stubs
Jun 17, 2025
Merged
Changes from all commits
Commits
Show all changes
134 commits
Select commit
Hold shift + click to select a range
1087d64
added skeletons for pipeline expressions
daniel-sanche 053f55b
added quick implemtation for expressions
daniel-sanche e56459c
added default implementation for pipeline stages
daniel-sanche babafb2
ran black
daniel-sanche 149c61f
got code to run
daniel-sanche 320cea1
moved helpers
daniel-sanche cd2963d
added basic pipelines.py file
daniel-sanche 3cbc2d5
added yaml test file
daniel-sanche 0c2241d
wrote basic parser for pipeline yaml
daniel-sanche 9c8982c
encoded extra java system tests
daniel-sanche 863bd1d
reconstruct pipeline expr objects
daniel-sanche 7cd2c11
got yaml to run
daniel-sanche c328934
add data loading code
daniel-sanche de0a862
updated protos
daniel-sanche 45c83aa
added pyyaml to system test dependencies
daniel-sanche fc57f04
added Expr methods
daniel-sanche 0224f94
trying to improve accumulator api
daniel-sanche cdee937
improved aggregate/accumulators
daniel-sanche 5d35c54
fixed naming in yaml
daniel-sanche 11373e3
create objects in parsing code
daniel-sanche d2b4153
use order enum
daniel-sanche bfdfba3
standardize how I deal with map stages
daniel-sanche afa823a
fix broke super.__init__ calls
daniel-sanche 3348127
added repr for custom classes
daniel-sanche e3c995d
added repr to pipeline
daniel-sanche 2d604e1
fixed vector formatting
daniel-sanche 3bed807
only treat capitalized strings as possible exprs
daniel-sanche 9b9eaa8
Where uses positional args in yaml
daniel-sanche e35accf
support union stage
daniel-sanche 72301a1
fixed testReplace
daniel-sanche 07d25dc
fixed sample
daniel-sanche 38fb7fa
iterating on sample options
daniel-sanche 0c3852f
fixed import
daniel-sanche 1d95e54
removing kwargs
daniel-sanche a3daa07
removing keargs
daniel-sanche 8e45b68
use only positional arguments in yaml
daniel-sanche 6b2a605
added _to_pb to stages
daniel-sanche 6d2aa90
added _pb to pipeline expressions
daniel-sanche e51d9dc
added unimplemented stage._to_pbs
daniel-sanche 8f6dea5
fixed proto formatting
daniel-sanche 654e5f7
got protos to build with no errors
daniel-sanche 69d4b76
added headers
daniel-sanche 6d966a8
fixed some types
daniel-sanche 33fee52
fixed expression typing
daniel-sanche cb8539d
added abstract to expr
daniel-sanche 380dce9
fixed types in pipeline_stages
daniel-sanche 831886f
align typing
daniel-sanche 79df205
added stubs for execute_pipeline
daniel-sanche e760c44
fixed union stage proto
daniel-sanche ea1e2ba
propagate client in test parsers
daniel-sanche b6b082d
enable mypy for pipeline code
daniel-sanche 658964c
added query.pipeline
daniel-sanche 7e12b74
got queries to execute
daniel-sanche e463418
fixed some encodings
daniel-sanche 4f527c1
broke pipelines into separate async/sync/base files
daniel-sanche a323e5b
added docstrings to expressions
daniel-sanche 7e70022
improved ordering
daniel-sanche 98887a0
added docstrings to pipeline classes
daniel-sanche 549c590
added docstrings
daniel-sanche 376015c
fixed sample_options
daniel-sanche e3de1b8
catch expected errors in test
daniel-sanche f725bc7
added alias to unnest
daniel-sanche 128ab1c
fixed replace
daniel-sanche 64a8bda
added pipeline to client
daniel-sanche ec06080
improved query.pipeline logic
daniel-sanche 0703532
removed imports
daniel-sanche 246d1c8
pass stages through
daniel-sanche b72d44a
fixed collection docstrings
daniel-sanche 810ccd2
fixed collection setup
daniel-sanche 78b5833
fixed some collection errors
daniel-sanche 36e0228
implemented FitlerCondition._from_pb
daniel-sanche 43f4cf4
renamed function
daniel-sanche 51e9b25
added tests for query.pipeline
daniel-sanche 35cb0bb
added verify pipeline to system tests
daniel-sanche 6283d1a
fixed bug in filter conversion
daniel-sanche 080bf42
return pipeline copy
daniel-sanche 6cd5c63
updated results format
daniel-sanche 7143247
added proto assertions to e2e tests
daniel-sanche 05c4f23
added tests for FieldFilter._from_filter_pb
daniel-sanche 7512a1a
fixed repr
daniel-sanche 901c0e1
fixed typing issues
daniel-sanche 6a5f72e
fixed client fixture
daniel-sanche 072669f
fixed mapget in e2e yaml
daniel-sanche 05a7498
fixed docstring
daniel-sanche 9f8d4c8
fixed append bug
daniel-sanche 863a09d
compare result data in tests
daniel-sanche e9f11c6
fixed incorrect test case ordering
daniel-sanche d2c33f2
yield document snapshots
daniel-sanche db1ecaa
added async test
daniel-sanche b6d74da
cleaning up e2e yaml
daniel-sanche 70efea7
fixed scope in tests
daniel-sanche b36afa8
added _client to base pipeline
daniel-sanche 7a26f7e
fixing test yaml
daniel-sanche fee6c90
renamed exceute_async to execute
daniel-sanche da18289
broke up pipeline tests into separate functions
daniel-sanche fc80062
improved faulty test yaml
daniel-sanche c049e21
remvoved unready stages and expressions
daniel-sanche 41b91d4
fixed regex_match test
daniel-sanche 7f69229
ran blacken
daniel-sanche 0b4c294
turn Stage into an ABC
daniel-sanche 255b698
removed extra stages and expressions
daniel-sanche bd9c2c4
stripped down to stubs
daniel-sanche d8dc10f
added PipelineStages
daniel-sanche 1f2390a
removed collection.pipeline
daniel-sanche 39f261a
fixed docstrings
daniel-sanche 48bf9f7
added pipeline_result
daniel-sanche b57424d
chore: updated gapic layer for execute_query
daniel-sanche 8fde414
updated gapics
daniel-sanche 9cac154
Merge branch 'pipeline_queries_approved' into pipeline_queries_1_stubs
daniel-sanche 93003c0
removed unneeded code
daniel-sanche 79d016a
fixed lint
daniel-sanche 907a551
added client tests
daniel-sanche 8e20c11
added pipeline tests
daniel-sanche 98c7ea5
added tests for execute
daniel-sanche af4fb20
broke out shared logic into base_pipeline
daniel-sanche cd38fc2
added pipeline stages tests
daniel-sanche 0ac319e
removed unneeded stages
daniel-sanche 7981a34
added tests for pipeline expressions
daniel-sanche 62b6510
added pipeline_result tests
daniel-sanche 035c6e6
added tests for pipeline source
daniel-sanche 5dd1246
added transaction to execute call
daniel-sanche 6434023
ran blacken
daniel-sanche a8beea4
fixed lint
daniel-sanche 2d286bb
fixed mypy
daniel-sanche d2babd2
Merge branch 'pipeline_queries_approved' into pipeline_queries_1_stubs
daniel-sanche b46bdc1
fixed test issues
daniel-sanche 22b558c
Merge branch 'pipeline_queries_approved' into pipeline_queries_1_stubs
daniel-sanche 3432322
fixed lint
daniel-sanche 64cd4fb
fixed comment
daniel-sanche e74e04d
added separate stream/execute methods
daniel-sanche a818f52
removed converter reference
daniel-sanche 8a9c3ec
made stages private
daniel-sanche 06a2084
added generic_stage method to base_pipeline
daniel-sanche 13389b8
fixed mypy
daniel-sanche File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
| from typing import Optional | ||
| from abc import ABC | ||
| from abc import abstractmethod | ||
|
|
||
| from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb | ||
| from google.cloud.firestore_v1.types.document import Value | ||
| from google.cloud.firestore_v1.pipeline_expressions import Expr | ||
|
|
||
|
|
||
| class Stage(ABC): | ||
| """Base class for all pipeline stages. | ||
|
|
||
| Each stage represents a specific operation (e.g., filtering, sorting, | ||
| transforming) within a Firestore pipeline. Subclasses define the specific | ||
| arguments and behavior for each operation. | ||
| """ | ||
|
|
||
| def __init__(self, custom_name: Optional[str] = None): | ||
| self.name = custom_name or type(self).__name__.lower() | ||
|
|
||
| def _to_pb(self) -> Pipeline_pb.Stage: | ||
| return Pipeline_pb.Stage( | ||
| name=self.name, args=self._pb_args(), options=self._pb_options() | ||
| ) | ||
|
|
||
| @abstractmethod | ||
| def _pb_args(self) -> list[Value]: | ||
| """Return Ordered list of arguments the given stage expects""" | ||
| raise NotImplementedError | ||
|
|
||
| def _pb_options(self) -> dict[str, Value]: | ||
| """Return optional named arguments that certain functions may support.""" | ||
| return {} | ||
|
|
||
| def __repr__(self): | ||
| items = ("%s=%r" % (k, v) for k, v in self.__dict__.items() if k != "name") | ||
| return f"{self.__class__.__name__}({', '.join(items)})" | ||
|
|
||
|
|
||
| class Collection(Stage): | ||
| """Specifies a collection as the initial data source.""" | ||
|
|
||
| def __init__(self, path: str): | ||
| super().__init__() | ||
| if not path.startswith("/"): | ||
| path = f"/{path}" | ||
| self.path = path | ||
|
|
||
| def _pb_args(self): | ||
| return [Value(reference_value=self.path)] | ||
|
|
||
|
|
||
| class GenericStage(Stage): | ||
| """Represents a generic, named stage with parameters.""" | ||
|
|
||
| def __init__(self, name: str, *params: Expr | Value): | ||
| super().__init__(name) | ||
| self.params: list[Value] = [ | ||
| p._to_pb() if isinstance(p, Expr) else p for p in params | ||
| ] | ||
|
|
||
| def _pb_args(self): | ||
| return self.params | ||
|
|
||
| def __repr__(self): | ||
| return f"{self.__class__.__name__}(name='{self.name}')" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
| from typing import AsyncIterable, TYPE_CHECKING | ||
| from google.cloud.firestore_v1 import _pipeline_stages as stages | ||
| from google.cloud.firestore_v1.base_pipeline import _BasePipeline | ||
|
|
||
| if TYPE_CHECKING: # pragma: NO COVER | ||
| from google.cloud.firestore_v1.async_client import AsyncClient | ||
| from google.cloud.firestore_v1.pipeline_result import PipelineResult | ||
| from google.cloud.firestore_v1.async_transaction import AsyncTransaction | ||
|
|
||
|
|
||
| class AsyncPipeline(_BasePipeline): | ||
| """ | ||
| Pipelines allow for complex data transformations and queries involving | ||
| multiple stages like filtering, projection, aggregation, and vector search. | ||
|
|
||
| This class extends `_BasePipeline` and provides methods to execute the | ||
| defined pipeline stages using an asynchronous `AsyncClient`. | ||
|
|
||
| Usage Example: | ||
| >>> from google.cloud.firestore_v1.pipeline_expressions import Field | ||
| >>> | ||
| >>> async def run_pipeline(): | ||
| ... client = AsyncClient(...) | ||
| ... pipeline = client.pipeline() | ||
| ... .collection("books") | ||
| ... .where(Field.of("published").gt(1980)) | ||
| ... .select("title", "author") | ||
| ... async for result in pipeline.execute(): | ||
| ... print(result) | ||
|
|
||
| Use `client.pipeline()` to create instances of this class. | ||
| """ | ||
|
|
||
| def __init__(self, client: AsyncClient, *stages: stages.Stage): | ||
| """ | ||
| Initializes an asynchronous Pipeline. | ||
|
|
||
| Args: | ||
| client: The asynchronous `AsyncClient` instance to use for execution. | ||
| *stages: Initial stages for the pipeline. | ||
| """ | ||
| super().__init__(client, *stages) | ||
|
|
||
| async def execute( | ||
| self, | ||
| transaction: "AsyncTransaction" | None = None, | ||
| ) -> list[PipelineResult]: | ||
| """ | ||
| Executes this pipeline and returns results as a list | ||
|
|
||
| Args: | ||
| transaction | ||
| (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): | ||
| An existing transaction that this query will run in. | ||
| If a ``transaction`` is used and it already has write operations | ||
| added, this method cannot be used (i.e. read-after-write is not | ||
| allowed). | ||
| """ | ||
| return [result async for result in self.stream(transaction=transaction)] | ||
|
|
||
| async def stream( | ||
| self, | ||
| transaction: "AsyncTransaction" | None = None, | ||
| ) -> AsyncIterable[PipelineResult]: | ||
| """ | ||
| Process this pipeline as a stream, providing results through an Iterable | ||
|
|
||
| Args: | ||
| transaction | ||
| (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): | ||
| An existing transaction that this query will run in. | ||
| If a ``transaction`` is used and it already has write operations | ||
| added, this method cannot be used (i.e. read-after-write is not | ||
| allowed). | ||
| """ | ||
| request = self._prep_execute_request(transaction) | ||
| async for response in await self._client._firestore_api.execute_pipeline( | ||
| request | ||
| ): | ||
| for result in self._execute_response_helper(response): | ||
| yield result | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
| from typing import Iterable, Sequence, TYPE_CHECKING | ||
| from google.cloud.firestore_v1 import _pipeline_stages as stages | ||
| from google.cloud.firestore_v1.types.pipeline import ( | ||
| StructuredPipeline as StructuredPipeline_pb, | ||
| ) | ||
| from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest | ||
| from google.cloud.firestore_v1.pipeline_result import PipelineResult | ||
| from google.cloud.firestore_v1.pipeline_expressions import Expr | ||
| from google.cloud.firestore_v1 import _helpers | ||
|
|
||
| if TYPE_CHECKING: # pragma: NO COVER | ||
| from google.cloud.firestore_v1.client import Client | ||
| from google.cloud.firestore_v1.async_client import AsyncClient | ||
| from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse | ||
| from google.cloud.firestore_v1.transaction import BaseTransaction | ||
|
|
||
|
|
||
| class _BasePipeline: | ||
| """ | ||
| Base class for building Firestore data transformation and query pipelines. | ||
|
|
||
| This class is not intended to be instantiated directly. | ||
| Use `client.collection.("...").pipeline()` to create pipeline instances. | ||
| """ | ||
|
|
||
| def __init__(self, client: Client | AsyncClient): | ||
| """ | ||
| Initializes a new pipeline. | ||
|
|
||
| Pipelines should not be instantiated directly. Instead, | ||
| call client.pipeline() to create an instance | ||
|
|
||
| Args: | ||
| client: The client associated with the pipeline | ||
| """ | ||
| self._client = client | ||
| self.stages: Sequence[stages.Stage] = tuple() | ||
|
|
||
| @classmethod | ||
| def _create_with_stages( | ||
| cls, client: Client | AsyncClient, *stages | ||
| ) -> _BasePipeline: | ||
| """ | ||
| Initializes a new pipeline with the given stages. | ||
|
|
||
| Pipeline classes should not be instantiated directly. | ||
|
|
||
| Args: | ||
| client: The client associated with the pipeline | ||
| *stages: Initial stages for the pipeline. | ||
| """ | ||
| new_instance = cls(client) | ||
| new_instance.stages = tuple(stages) | ||
| return new_instance | ||
|
|
||
| def __repr__(self): | ||
| cls_str = type(self).__name__ | ||
| if not self.stages: | ||
| return f"{cls_str}()" | ||
| elif len(self.stages) == 1: | ||
| return f"{cls_str}({self.stages[0]!r})" | ||
| else: | ||
| stages_str = ",\n ".join([repr(s) for s in self.stages]) | ||
| return f"{cls_str}(\n {stages_str}\n)" | ||
|
|
||
| def _to_pb(self) -> StructuredPipeline_pb: | ||
| return StructuredPipeline_pb( | ||
| pipeline={"stages": [s._to_pb() for s in self.stages]} | ||
| ) | ||
|
|
||
| def _append(self, new_stage): | ||
| """ | ||
| Create a new Pipeline object with a new stage appended | ||
| """ | ||
| return self.__class__._create_with_stages(self._client, *self.stages, new_stage) | ||
|
|
||
| def _prep_execute_request( | ||
| self, transaction: BaseTransaction | None | ||
| ) -> ExecutePipelineRequest: | ||
| """ | ||
| shared logic for creating an ExecutePipelineRequest | ||
| """ | ||
| database_name = ( | ||
| f"projects/{self._client.project}/databases/{self._client._database}" | ||
| ) | ||
| transaction_id = ( | ||
| _helpers.get_transaction_id(transaction) | ||
| if transaction is not None | ||
| else None | ||
| ) | ||
| request = ExecutePipelineRequest( | ||
| database=database_name, | ||
| transaction=transaction_id, | ||
| structured_pipeline=self._to_pb(), | ||
| ) | ||
| return request | ||
|
|
||
| def _execute_response_helper( | ||
| self, response: ExecutePipelineResponse | ||
| ) -> Iterable[PipelineResult]: | ||
| """ | ||
| shared logic for unpacking an ExecutePipelineReponse into PipelineResults | ||
| """ | ||
| for doc in response.results: | ||
| ref = self._client.document(doc.name) if doc.name else None | ||
| yield PipelineResult( | ||
| self._client, | ||
| doc.fields, | ||
| ref, | ||
| response._pb.execution_time, | ||
| doc._pb.create_time if doc.create_time else None, | ||
| doc._pb.update_time if doc.update_time else None, | ||
| ) | ||
|
|
||
| def generic_stage(self, name: str, *params: Expr) -> "_BasePipeline": | ||
| """ | ||
| Adds a generic, named stage to the pipeline with specified parameters. | ||
|
|
||
| This method provides a flexible way to extend the pipeline's functionality | ||
| by adding custom stages. Each generic stage is defined by a unique `name` | ||
| and a set of `params` that control its behavior. | ||
|
|
||
| Example: | ||
| >>> # Assume we don't have a built-in "where" stage | ||
| >>> pipeline = client.pipeline().collection("books") | ||
| >>> pipeline = pipeline.generic_stage("where", [Field.of("published").lt(900)]) | ||
| >>> pipeline = pipeline.select("title", "author") | ||
|
|
||
| Args: | ||
| name: The name of the generic stage. | ||
| *params: A sequence of `Expr` objects representing the parameters for the stage. | ||
|
|
||
| Returns: | ||
| A new Pipeline object with this stage appended to the stage list | ||
| """ | ||
| return self._append(stages.GenericStage(name, *params)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with how firestore for Python works, but within the SDKs my team works on, execute would buffer all of the PipelineResults into memory and make these available as an array. The execute method itself would be asyncronous.
However, our server sdks, java-firestore and nodejs-firstore, will offer a streaming API
pipeline.stream(). And much like the AsyncIterable, this would allow users to stream PipelineResults as they are received and not buffer all into memroyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, FWIW, we will be adding a wrapper object PipelineSnapshot which contains all results and other metadata (like explain info and timestamps). I will be sending your team an updated nodejs-firestore PR with our latest changes, as soon as i can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that makes sense, good feedback.
I will create separate methods for
stream(): -> AsyncIterable[PipelineResult]andexecute_pipeline(): -> list[PipelineResult], and then we can convert that list into aPipelineSnapshotin the future