Skip to content

Conversation

@daniel-sanche
Copy link
Contributor

This is the first PR for pipelines support, adding the base structure for the new feature:

  • added PipelineSource class to create a new pipeline pointed at a resource
    • added client.pipeline() to create a new PipelineSource object
  • added pipeline_expressions file to hold set of expressions
    • Currently just holding base Expr and Constant
  • added pipeline_stages file to hold set of stages.
    • Currently just holding base Stage, Collection, and GenericStage
  • added BasePipeline, Pipeline, and AsyncPipeline to chain together sequences of stages
    • implemented pipleine.execute() in sync and async classes
  • added PipelineResult to expose resulting data back to the user

This PR contains unit tests. System tests will come later, when we have more stages

@daniel-sanche daniel-sanche requested a review from a team as a code owner May 12, 2025 23:30
@product-auto-label product-auto-label bot added size: xl Pull request size is extra large. api: firestore Issues related to the googleapis/python-firestore API. labels May 12, 2025
@daniel-sanche daniel-sanche force-pushed the pipeline_queries_approved branch from b57424d to 3f9b65f Compare May 13, 2025 18:25
@product-auto-label product-auto-label bot added size: l Pull request size is large. and removed size: xl Pull request size is extra large. labels Jun 9, 2025
@daniel-sanche daniel-sanche force-pushed the pipeline_queries_1_stubs branch from 1f2bfa4 to 22b558c Compare June 9, 2025 21:17
@product-auto-label product-auto-label bot added size: xl Pull request size is extra large. and removed size: l Pull request size is large. labels Jun 11, 2025
@bhshkh
Copy link

bhshkh commented Jun 12, 2025

LGTM

Copy link

@MarkDuckworth MarkDuckworth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. I put some comments for your consideration. They may not be actionable, just FYIs

async def execute(
self,
transaction: "AsyncTransaction" | None = None,
) -> AsyncIterable[PipelineResult]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with how firestore for Python works, but within the SDKs my team works on, execute would buffer all of the PipelineResults into memory and make these available as an array. The execute method itself would be asyncronous.

However, our server sdks, java-firestore and nodejs-firstore, will offer a streaming API pipeline.stream(). And much like the AsyncIterable, this would allow users to stream PipelineResults as they are received and not buffer all into memroy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, FWIW, we will be adding a wrapper object PipelineSnapshot which contains all results and other metadata (like explain info and timestamps). I will be sending your team an updated nodejs-firestore PR with our latest changes, as soon as i can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense, good feedback.

I will create separate methods for stream(): -> AsyncIterable[PipelineResult] and execute_pipeline(): -> list[PipelineResult], and then we can convert that list into a PipelineSnapshot in the future

def execute(
self,
transaction: "Transaction" | None = None,
) -> Iterable[PipelineResult]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the synchronous iterable here. Makes sense. But I'm still wondering if there is an in-between impementation, where the execute is asynchronous, but the results are available in a synchronous iterable.

Also, same comment about returning a PipelineSnapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Python really provides a way to do this, because synchronous iterables are inherently blocking. But I'm interested if you have an idea in mind

GeoPoint,
Vector,
list,
Dict[str, Any],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be a candidate for Dict[str, CONSTANT_TYPE]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't think Python's type system is powerful enough for this kind of recursive TypeVar. But I'll take another look

Retrieves all fields in the result.

If a converter was provided to this `PipelineResult`, the result of the
converter's `from_firestore` method is returned.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell if converters are or will be supported in Python pipelines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I ended up cutting it from this PR. I'll remove the docstring

Let me know if you think converters would be important to have in Python for launch

return f"{self.__class__.__name__}({', '.join(items)})"


class Collection(Stage):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, in most SDKs, we tried to make adding a stage a method on Pipeline. Only in the Java SDKs we expose the Stage classes. I guess it's too early to tell how you are planning to implement the API to add stages to a Pipeline. And I guess you will do whatever is idomatic for Python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python currently exposes them both ways, but using the methods on pipeline is more idiomatic and user-friendly. Maybe I'll mark this class as private, and keep the pipeline methods as the official public way to add a stage

@daniel-sanche daniel-sanche merged commit 17e71b9 into pipeline_queries_approved Jun 17, 2025
7 checks passed
@daniel-sanche daniel-sanche deleted the pipeline_queries_1_stubs branch June 17, 2025 22:54
daniel-sanche added a commit that referenced this pull request Jun 23, 2025
commit ad3e3df
Author: Daniel Sanche <[email protected]>
Date:   Fri Jun 20 17:44:37 2025 -0700

    added .pipeline() to aggregation query

commit 4ce1b91
Author: Daniel Sanche <[email protected]>
Date:   Fri Jun 20 15:17:20 2025 -0700

    added exprs and stages needed for aggregation pipelines

commit e7d8e52
Merge: 0d15355 17e71b9
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 17 16:57:45 2025 -0700

    Merge branch 'pipeline_queries_approved' into pipeline_queries_2_query_parity

commit 17e71b9
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 17 15:54:31 2025 -0700

    feat: add pipelines structure (#1046)

commit 0d15355
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 17 15:52:38 2025 -0700

    fixed tests

commit bc25930
Merge: 6351ae7 13389b8
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 17 15:38:05 2025 -0700

    Merge branch 'pipeline_queries_1_stubs' into pipeline_queries_2_query_parity

commit 6351ae7
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 17 15:35:15 2025 -0700

    merged PR #1

commit 13389b8
Author: Daniel Sanche <[email protected]>
Date:   Mon Jun 16 15:15:26 2025 -0700

    fixed mypy

commit 06a2084
Author: Daniel Sanche <[email protected]>
Date:   Mon Jun 16 14:57:54 2025 -0700

    added generic_stage method to base_pipeline

commit 8a9c3ec
Author: Daniel Sanche <[email protected]>
Date:   Mon Jun 16 14:55:50 2025 -0700

    made stages private

commit a818f52
Author: Daniel Sanche <[email protected]>
Date:   Mon Jun 16 14:28:20 2025 -0700

    removed converter reference

commit e74e04d
Author: Daniel Sanche <[email protected]>
Date:   Mon Jun 16 14:22:13 2025 -0700

    added separate stream/execute methods

commit 64cd4fb
Author: Daniel Sanche <[email protected]>
Date:   Thu Jun 12 11:22:25 2025 -0700

    fixed comment

commit 3432322
Author: Daniel Sanche <[email protected]>
Date:   Tue Jun 10 17:03:09 2025 -0700

    fixed lint
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: firestore Issues related to the googleapis/python-firestore API. size: xl Pull request size is extra large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants