From fa88c7b390c9d4aa6d387f1e60a07d899515cf9a Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Tue, 4 Nov 2025 22:26:24 -0800 Subject: [PATCH 1/8] proto --- .../sql/connect/proto/pipelines_pb2.pyi | 92 ++++++++++++++++++- .../protobuf/spark/connect/pipelines.proto | 14 +++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 60d131037c99d..e0062b04b5ac8 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -553,6 +553,80 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] ) -> typing_extensions.Literal["relation"] | None: ... + class StandaloneFlowDetails(google.protobuf.message.Message): + """A standalone flow that writes to the target dataset with additional options.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RELATION_FIELD_NUMBER: builtins.int + OUTPUTMODE_FIELD_NUMBER: builtins.int + ONCE_FIELD_NUMBER: builtins.int + @property + def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: + """An unresolved relation that defines the dataset's flow. Empty if the query function + that defines the flow cannot be analyzed at the time of flow definition. + """ + outputMode: builtins.str + """The output mode for the standalone flow.""" + once: builtins.bool + """If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: + - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + - The flow function must be a batch DataFrame, not a streaming DataFrame. + """ + def __init__( + self, + *, + relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., + outputMode: builtins.str | None = ..., + once: builtins.bool | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_once", + b"_once", + "_outputMode", + b"_outputMode", + "_relation", + b"_relation", + "once", + b"once", + "outputMode", + b"outputMode", + "relation", + b"relation", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_once", + b"_once", + "_outputMode", + b"_outputMode", + "_relation", + b"_relation", + "once", + b"once", + "outputMode", + b"outputMode", + "relation", + b"relation", + ], + ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_once", b"_once"] + ) -> typing_extensions.Literal["once"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_outputMode", b"_outputMode"] + ) -> typing_extensions.Literal["outputMode"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] + ) -> typing_extensions.Literal["relation"] | None: ... + class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -587,6 +661,7 @@ class PipelineCommand(google.protobuf.message.Message): CLIENT_ID_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int + STANDALONE_FLOW_DETAILS_FIELD_NUMBER: builtins.int EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" @@ -611,6 +686,10 @@ class PipelineCommand(google.protobuf.message.Message): self, ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ... @property + def standalone_flow_details( + self, + ) -> global___PipelineCommand.DefineFlow.StandaloneFlowDetails: ... + @property def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, @@ -623,6 +702,8 @@ class PipelineCommand(google.protobuf.message.Message): source_code_location: global___SourceCodeLocation | None = ..., relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails | None = ..., + standalone_flow_details: global___PipelineCommand.DefineFlow.StandaloneFlowDetails + | None = ..., extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( @@ -652,6 +733,8 @@ class PipelineCommand(google.protobuf.message.Message): b"relation_flow_details", "source_code_location", b"source_code_location", + "standalone_flow_details", + b"standalone_flow_details", "target_dataset_name", b"target_dataset_name", ], @@ -685,6 +768,8 @@ class PipelineCommand(google.protobuf.message.Message): b"source_code_location", "sql_conf", b"sql_conf", + "standalone_flow_details", + b"standalone_flow_details", "target_dataset_name", b"target_dataset_name", ], @@ -717,7 +802,12 @@ class PipelineCommand(google.protobuf.message.Message): @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["details", b"details"] - ) -> typing_extensions.Literal["relation_flow_details", "extension"] | None: ... + ) -> ( + typing_extensions.Literal[ + "relation_flow_details", "standalone_flow_details", "extension" + ] + | None + ): ... class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index 0fa36f8a15143..c08e385c97b4b 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -142,6 +142,7 @@ message PipelineCommand { oneof details { WriteRelationFlowDetails relation_flow_details = 7; + StandaloneFlowDetails standalone_flow_details = 8; google.protobuf.Any extension = 999; } @@ -152,6 +153,19 @@ message PipelineCommand { optional spark.connect.Relation relation = 1; } + // A standalone flow that writes to the target dataset with additional options. + message StandaloneFlowDetails { + // An unresolved relation that defines the dataset's flow. Empty if the query function + // that defines the flow cannot be analyzed at the time of flow definition. + optional spark.connect.Relation relation = 1; + // The output mode for the standalone flow. + optional string outputMode = 2; + // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: + // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + // - The flow function must be a batch DataFrame, not a streaming DataFrame. + optional bool once = 3; + } + message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. optional string flow_name = 1; From 2f2e58ee6c18954a20b6e1eb9a5376d58319def5 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Tue, 4 Nov 2025 23:14:41 -0800 Subject: [PATCH 2/8] comment --- .../pyspark/sql/connect/proto/pipelines_pb2.pyi | 16 ++++++++++++++++ .../main/protobuf/spark/connect/pipelines.proto | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index e0062b04b5ac8..e66589759ee3f 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -561,6 +561,7 @@ class PipelineCommand(google.protobuf.message.Message): RELATION_FIELD_NUMBER: builtins.int OUTPUTMODE_FIELD_NUMBER: builtins.int ONCE_FIELD_NUMBER: builtins.int + COMMENT_FIELD_NUMBER: builtins.int @property def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """An unresolved relation that defines the dataset's flow. Empty if the query function @@ -573,22 +574,29 @@ class PipelineCommand(google.protobuf.message.Message): - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. - The flow function must be a batch DataFrame, not a streaming DataFrame. """ + comment: builtins.str + """An optional comment for the flow.""" def __init__( self, *, relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., outputMode: builtins.str | None = ..., once: builtins.bool | None = ..., + comment: builtins.str | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ + "_comment", + b"_comment", "_once", b"_once", "_outputMode", b"_outputMode", "_relation", b"_relation", + "comment", + b"comment", "once", b"once", "outputMode", @@ -600,12 +608,16 @@ class PipelineCommand(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_comment", + b"_comment", "_once", b"_once", "_outputMode", b"_outputMode", "_relation", b"_relation", + "comment", + b"comment", "once", b"once", "outputMode", @@ -615,6 +627,10 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_comment", b"_comment"] + ) -> typing_extensions.Literal["comment"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_once", b"_once"] ) -> typing_extensions.Literal["once"] | None: ... diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c08e385c97b4b..f98fd29201d0c 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -158,12 +158,17 @@ message PipelineCommand { // An unresolved relation that defines the dataset's flow. Empty if the query function // that defines the flow cannot be analyzed at the time of flow definition. optional spark.connect.Relation relation = 1; + // The output mode for the standalone flow. optional string outputMode = 2; + // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. // - The flow function must be a batch DataFrame, not a streaming DataFrame. optional bool once = 3; + + // An optional comment for the flow. + optional string comment = 4; } message Response { From 0e28d09b73887b6f9fa1711061e9d7822a264c09 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 5 Nov 2025 10:43:27 -0800 Subject: [PATCH 3/8] Revert "comment" This reverts commit 707fc814a43f191598d33c86e5d0cf3dd294e20d. --- .../pyspark/sql/connect/proto/pipelines_pb2.pyi | 16 ---------------- .../main/protobuf/spark/connect/pipelines.proto | 5 ----- 2 files changed, 21 deletions(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index e66589759ee3f..e0062b04b5ac8 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -561,7 +561,6 @@ class PipelineCommand(google.protobuf.message.Message): RELATION_FIELD_NUMBER: builtins.int OUTPUTMODE_FIELD_NUMBER: builtins.int ONCE_FIELD_NUMBER: builtins.int - COMMENT_FIELD_NUMBER: builtins.int @property def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """An unresolved relation that defines the dataset's flow. Empty if the query function @@ -574,29 +573,22 @@ class PipelineCommand(google.protobuf.message.Message): - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. - The flow function must be a batch DataFrame, not a streaming DataFrame. """ - comment: builtins.str - """An optional comment for the flow.""" def __init__( self, *, relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., outputMode: builtins.str | None = ..., once: builtins.bool | None = ..., - comment: builtins.str | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ - "_comment", - b"_comment", "_once", b"_once", "_outputMode", b"_outputMode", "_relation", b"_relation", - "comment", - b"comment", "once", b"once", "outputMode", @@ -608,16 +600,12 @@ class PipelineCommand(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ - "_comment", - b"_comment", "_once", b"_once", "_outputMode", b"_outputMode", "_relation", b"_relation", - "comment", - b"comment", "once", b"once", "outputMode", @@ -627,10 +615,6 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_comment", b"_comment"] - ) -> typing_extensions.Literal["comment"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_once", b"_once"] ) -> typing_extensions.Literal["once"] | None: ... diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index f98fd29201d0c..c08e385c97b4b 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -158,17 +158,12 @@ message PipelineCommand { // An unresolved relation that defines the dataset's flow. Empty if the query function // that defines the flow cannot be analyzed at the time of flow definition. optional spark.connect.Relation relation = 1; - // The output mode for the standalone flow. optional string outputMode = 2; - // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. // - The flow function must be a batch DataFrame, not a streaming DataFrame. optional bool once = 3; - - // An optional comment for the flow. - optional string comment = 4; } message Response { From 5907bbf1d5d5edde7ca24aece1f79739178e3c82 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 5 Nov 2025 10:43:45 -0800 Subject: [PATCH 4/8] Revert "proto" This reverts commit d126dcc896a0b0fe1c4b6aec93c245b285f867e6. --- .../sql/connect/proto/pipelines_pb2.pyi | 92 +------------------ .../protobuf/spark/connect/pipelines.proto | 14 --- 2 files changed, 1 insertion(+), 105 deletions(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index e0062b04b5ac8..60d131037c99d 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -553,80 +553,6 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] ) -> typing_extensions.Literal["relation"] | None: ... - class StandaloneFlowDetails(google.protobuf.message.Message): - """A standalone flow that writes to the target dataset with additional options.""" - - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - RELATION_FIELD_NUMBER: builtins.int - OUTPUTMODE_FIELD_NUMBER: builtins.int - ONCE_FIELD_NUMBER: builtins.int - @property - def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: - """An unresolved relation that defines the dataset's flow. Empty if the query function - that defines the flow cannot be analyzed at the time of flow definition. - """ - outputMode: builtins.str - """The output mode for the standalone flow.""" - once: builtins.bool - """If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: - - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. - - The flow function must be a batch DataFrame, not a streaming DataFrame. - """ - def __init__( - self, - *, - relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., - outputMode: builtins.str | None = ..., - once: builtins.bool | None = ..., - ) -> None: ... - def HasField( - self, - field_name: typing_extensions.Literal[ - "_once", - b"_once", - "_outputMode", - b"_outputMode", - "_relation", - b"_relation", - "once", - b"once", - "outputMode", - b"outputMode", - "relation", - b"relation", - ], - ) -> builtins.bool: ... - def ClearField( - self, - field_name: typing_extensions.Literal[ - "_once", - b"_once", - "_outputMode", - b"_outputMode", - "_relation", - b"_relation", - "once", - b"once", - "outputMode", - b"outputMode", - "relation", - b"relation", - ], - ) -> None: ... - @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_once", b"_once"] - ) -> typing_extensions.Literal["once"] | None: ... - @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_outputMode", b"_outputMode"] - ) -> typing_extensions.Literal["outputMode"] | None: ... - @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] - ) -> typing_extensions.Literal["relation"] | None: ... - class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -661,7 +587,6 @@ class PipelineCommand(google.protobuf.message.Message): CLIENT_ID_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int - STANDALONE_FLOW_DETAILS_FIELD_NUMBER: builtins.int EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" @@ -686,10 +611,6 @@ class PipelineCommand(google.protobuf.message.Message): self, ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ... @property - def standalone_flow_details( - self, - ) -> global___PipelineCommand.DefineFlow.StandaloneFlowDetails: ... - @property def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, @@ -702,8 +623,6 @@ class PipelineCommand(google.protobuf.message.Message): source_code_location: global___SourceCodeLocation | None = ..., relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails | None = ..., - standalone_flow_details: global___PipelineCommand.DefineFlow.StandaloneFlowDetails - | None = ..., extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( @@ -733,8 +652,6 @@ class PipelineCommand(google.protobuf.message.Message): b"relation_flow_details", "source_code_location", b"source_code_location", - "standalone_flow_details", - b"standalone_flow_details", "target_dataset_name", b"target_dataset_name", ], @@ -768,8 +685,6 @@ class PipelineCommand(google.protobuf.message.Message): b"source_code_location", "sql_conf", b"sql_conf", - "standalone_flow_details", - b"standalone_flow_details", "target_dataset_name", b"target_dataset_name", ], @@ -802,12 +717,7 @@ class PipelineCommand(google.protobuf.message.Message): @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["details", b"details"] - ) -> ( - typing_extensions.Literal[ - "relation_flow_details", "standalone_flow_details", "extension" - ] - | None - ): ... + ) -> typing_extensions.Literal["relation_flow_details", "extension"] | None: ... class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c08e385c97b4b..0fa36f8a15143 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -142,7 +142,6 @@ message PipelineCommand { oneof details { WriteRelationFlowDetails relation_flow_details = 7; - StandaloneFlowDetails standalone_flow_details = 8; google.protobuf.Any extension = 999; } @@ -153,19 +152,6 @@ message PipelineCommand { optional spark.connect.Relation relation = 1; } - // A standalone flow that writes to the target dataset with additional options. - message StandaloneFlowDetails { - // An unresolved relation that defines the dataset's flow. Empty if the query function - // that defines the flow cannot be analyzed at the time of flow definition. - optional spark.connect.Relation relation = 1; - // The output mode for the standalone flow. - optional string outputMode = 2; - // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: - // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. - // - The flow function must be a batch DataFrame, not a streaming DataFrame. - optional bool once = 3; - } - message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. optional string flow_name = 1; From e0d208cab07de7d8229c9c0d81798839ecfd6bcb Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 5 Nov 2025 10:44:55 -0800 Subject: [PATCH 5/8] once --- .../sql/connect/proto/pipelines_pb2.pyi | 19 +++++++++++++++++++ .../protobuf/spark/connect/pipelines.proto | 5 +++++ 2 files changed, 24 insertions(+) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 60d131037c99d..f87f7f2e48f77 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -588,6 +588,7 @@ class PipelineCommand(google.protobuf.message.Message): SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int EXTENSION_FIELD_NUMBER: builtins.int + ONCE_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -612,6 +613,11 @@ class PipelineCommand(google.protobuf.message.Message): ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ... @property def extension(self) -> google.protobuf.any_pb2.Any: ... + once: builtins.bool + """If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: + - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + - The flow function must be a batch DataFrame, not a streaming DataFrame. + """ def __init__( self, *, @@ -624,6 +630,7 @@ class PipelineCommand(google.protobuf.message.Message): relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails | None = ..., extension: google.protobuf.any_pb2.Any | None = ..., + once: builtins.bool | None = ..., ) -> None: ... def HasField( self, @@ -634,6 +641,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", + "_once", + b"_once", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -648,6 +657,8 @@ class PipelineCommand(google.protobuf.message.Message): b"extension", "flow_name", b"flow_name", + "once", + b"once", "relation_flow_details", b"relation_flow_details", "source_code_location", @@ -665,6 +676,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", + "_once", + b"_once", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -679,6 +692,8 @@ class PipelineCommand(google.protobuf.message.Message): b"extension", "flow_name", b"flow_name", + "once", + b"once", "relation_flow_details", b"relation_flow_details", "source_code_location", @@ -703,6 +718,10 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] ) -> typing_extensions.Literal["flow_name"] | None: ... @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_once", b"_once"] + ) -> typing_extensions.Literal["once"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal[ diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index 0fa36f8a15143..bd3ff14167c07 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -152,6 +152,11 @@ message PipelineCommand { optional spark.connect.Relation relation = 1; } + // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: + // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + // - The flow function must be a batch DataFrame, not a streaming DataFrame. + optional bool once = 8; + message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. optional string flow_name = 1; From d1fdb7507a6e62677085f3c2694b5e670add761d Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 5 Nov 2025 12:46:22 -0800 Subject: [PATCH 6/8] validation --- .../main/resources/error/error-conditions.json | 6 ++++++ .../connect/pipelines/PipelinesHandler.scala | 5 +++++ .../SparkDeclarativePipelinesServerSuite.scala | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3518765efd091..42e6e3892276b 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1412,6 +1412,12 @@ ], "sqlState" : "42623" }, + "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : { + "message" : [ + "Defining a one-time flow with the 'once' option is not supported." + ], + "sqlState" : "0A000" + }, "DESCRIBE_JSON_NOT_EXTENDED" : { "message" : [ "DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is specified.", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 0929b07be5237..1a3b0d2231c62 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -267,6 +267,11 @@ private[connect] object PipelinesHandler extends Logging { flow: proto.PipelineCommand.DefineFlow, transformRelationFunc: Relation => LogicalPlan, sessionHolder: SessionHolder): TableIdentifier = { + if (flow.hasOnce) { + throw new AnalysisException( + "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED", + Map("flowName" -> flow.getFlowName)) + } val dataflowGraphId = flow.getDataflowGraphId val graphElementRegistry = sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 9dba27c4525c2..ab60462e87351 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite } + gridTest("Define flow 'once' argument not supported")(Seq(true, false)) { onceValue => + val ex = intercept[Exception] { + withRawBlockingStub { implicit stub => + val graphId = createDataflowGraph + sendPlan( + buildPlanFromPipelineCommand( + PipelineCommand + .newBuilder() + .setDefineFlow(DefineFlow + .newBuilder() + .setDataflowGraphId(graphId) + .setOnce(onceValue)) + .build())) + } + } + assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED")) + } + test( "Cross dependency between SQL dataset and non-SQL dataset is valid and can be registered") { withRawBlockingStub { implicit stub => From aeb119e3e58b547509182665208f8cfa6915d7be Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 5 Nov 2025 12:48:35 -0800 Subject: [PATCH 7/8] nit --- python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 6 ++++-- .../common/src/main/protobuf/spark/connect/pipelines.proto | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index f87f7f2e48f77..e0768a1f6baeb 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -614,8 +614,10 @@ class PipelineCommand(google.protobuf.message.Message): @property def extension(self) -> google.protobuf.any_pb2.Any: ... once: builtins.bool - """If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: - - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + """If true, define the flow as a one-time flow, such as for backfill. + Set to true changes the flow in two ways: + - The flow is run one time by default. If the pipeline is ran with a full refresh, + the flow will run again. - The flow function must be a batch DataFrame, not a streaming DataFrame. """ def __init__( diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index bd3ff14167c07..a92e24fda9154 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -152,8 +152,10 @@ message PipelineCommand { optional spark.connect.Relation relation = 1; } - // If true, define the flow as a one-time flow, such as for backfill. Set to true changes the flow in two ways: - // - The flow is run one time by default. If the pipeline is ran with a full refresh, the flow will run again. + // If true, define the flow as a one-time flow, such as for backfill. + // Set to true changes the flow in two ways: + // - The flow is run one time by default. If the pipeline is ran with a full refresh, + // the flow will run again. // - The flow function must be a batch DataFrame, not a streaming DataFrame. optional bool once = 8; From 47dce38753d2b6bb0535389574d176fb0ef1ddb2 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Thu, 6 Nov 2025 17:50:39 -0800 Subject: [PATCH 8/8] merge conflict --- .../sql/connect/proto/pipelines_pb2.py | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index 139de83dc1aaf..0eb77c84b5b57 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -42,7 +42,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcb"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutputH\x00R\x0c\x64\x65\x66ineOutput\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x92\n\n\x0c\x44\x65\x66ineOutput\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12$\n\x0boutput_name\x18\x02 \x01(\tH\x02R\noutputName\x88\x01\x01\x12?\n\x0boutput_type\x18\x03 \x01(\x0e\x32\x19.spark.connect.OutputTypeH\x03R\noutputType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12_\n\rtable_details\x18\x06 \x01(\x0b\x32\x38.spark.connect.PipelineCommand.DefineOutput.TableDetailsH\x00R\x0ctableDetails\x12\\\n\x0csink_details\x18\x07 \x01(\x0b\x32\x37.spark.connect.PipelineCommand.DefineOutput.SinkDetailsH\x00R\x0bsinkDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xc0\x03\n\x0cTableDetails\x12x\n\x10table_properties\x18\x01 \x03(\x0b\x32M.spark.connect.PipelineCommand.DefineOutput.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x1b\n\x06\x66ormat\x18\x03 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x12\x43\n\x10schema_data_type\x18\x04 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x05 \x01(\tH\x00R\x0cschemaString\x12-\n\x12\x63lustering_columns\x18\x06 \x03(\tR\x11\x63lusteringColumns\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\t\n\x07_format\x1a\xd1\x01\n\x0bSinkDetails\x12^\n\x07options\x18\x01 \x03(\x0b\x32\x44.spark.connect.PipelineCommand.DefineOutput.SinkDetails.OptionsEntryR\x07options\x12\x1b\n\x06\x66ormat\x18\x02 \x01(\tH\x00R\x06\x66ormat\x88\x01\x01\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0e\n\x0c_output_nameB\x0e\n\x0c_output_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xdd\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf0\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12k\n\x14\x64\x65\x66ine_output_result\x18\x02 \x01(\x0b\x32\x37.spark.connect.PipelineCommandResult.DefineOutputResultH\x00R\x12\x64\x65\x66ineOutputResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x85\x01\n\x12\x44\x65\x66ineOutputResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"\xf1\x01\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x03 \x01(\tH\x02R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x0c\n\n_file_nameB\x0e\n\x0c_line_numberB\x12\n\x10_definition_path"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames"\xd7\x01\n\x17PipelineAnalysisContext\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x02 \x01(\tH\x01R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x14\n\x12_dataflow_graph_idB\x12\n\x10_definition_path*i\n\nOutputType\x12\x1b\n\x17OUTPUT_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x12\x08\n\x04SINK\x10\x04\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xed"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutputH\x00R\x0c\x64\x65\x66ineOutput\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x92\n\n\x0c\x44\x65\x66ineOutput\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12$\n\x0boutput_name\x18\x02 \x01(\tH\x02R\noutputName\x88\x01\x01\x12?\n\x0boutput_type\x18\x03 \x01(\x0e\x32\x19.spark.connect.OutputTypeH\x03R\noutputType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12_\n\rtable_details\x18\x06 \x01(\x0b\x32\x38.spark.connect.PipelineCommand.DefineOutput.TableDetailsH\x00R\x0ctableDetails\x12\\\n\x0csink_details\x18\x07 \x01(\x0b\x32\x37.spark.connect.PipelineCommand.DefineOutput.SinkDetailsH\x00R\x0bsinkDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xc0\x03\n\x0cTableDetails\x12x\n\x10table_properties\x18\x01 \x03(\x0b\x32M.spark.connect.PipelineCommand.DefineOutput.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x1b\n\x06\x66ormat\x18\x03 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x12\x43\n\x10schema_data_type\x18\x04 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x05 \x01(\tH\x00R\x0cschemaString\x12-\n\x12\x63lustering_columns\x18\x06 \x03(\tR\x11\x63lusteringColumns\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\t\n\x07_format\x1a\xd1\x01\n\x0bSinkDetails\x12^\n\x07options\x18\x01 \x03(\x0b\x32\x44.spark.connect.PipelineCommand.DefineOutput.SinkDetails.OptionsEntryR\x07options\x12\x1b\n\x06\x66ormat\x18\x02 \x01(\tH\x00R\x06\x66ormat\x88\x01\x01\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0e\n\x0c_output_nameB\x0e\n\x0c_output_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xff\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x12\x17\n\x04once\x18\x08 \x01(\x08H\x06R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_locationB\x07\n\x05_once\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf0\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12k\n\x14\x64\x65\x66ine_output_result\x18\x02 \x01(\x0b\x32\x37.spark.connect.PipelineCommandResult.DefineOutputResultH\x00R\x12\x64\x65\x66ineOutputResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x85\x01\n\x12\x44\x65\x66ineOutputResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"\xf1\x01\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x03 \x01(\tH\x02R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x0c\n\n_file_nameB\x0e\n\x0c_line_numberB\x12\n\x10_definition_path"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames"\xd7\x01\n\x17PipelineAnalysisContext\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x02 \x01(\tH\x01R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x14\n\x12_dataflow_graph_idB\x12\n\x10_definition_path*i\n\nOutputType\x12\x1b\n\x17OUTPUT_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x12\x08\n\x04SINK\x10\x04\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -69,10 +69,10 @@ ]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_OUTPUTTYPE"]._serialized_start = 6105 - _globals["_OUTPUTTYPE"]._serialized_end = 6210 + _globals["_OUTPUTTYPE"]._serialized_start = 6139 + _globals["_OUTPUTTYPE"]._serialized_end = 6244 _globals["_PIPELINECOMMAND"]._serialized_start = 195 - _globals["_PIPELINECOMMAND"]._serialized_end = 4622 + _globals["_PIPELINECOMMAND"]._serialized_end = 4656 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1338 @@ -94,37 +94,37 @@ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start = 2659 _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end = 2717 _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2833 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3694 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3728 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1338 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396 - _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 3427 - _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3524 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3526 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3584 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3697 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4019 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 4022 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4221 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 4224 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 4382 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 4385 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4606 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4625 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5377 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4994 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 5092 - _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 5095 - _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 5228 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 5231 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5362 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5379 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5452 - _globals["_PIPELINEEVENT"]._serialized_start = 5454 - _globals["_PIPELINEEVENT"]._serialized_end = 5570 - _globals["_SOURCECODELOCATION"]._serialized_start = 5573 - _globals["_SOURCECODELOCATION"]._serialized_end = 5814 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5816 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5885 - _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5888 - _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6103 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 3452 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3549 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3551 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3609 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3731 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4053 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 4056 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4255 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 4258 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 4416 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 4419 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4640 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4659 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5411 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 5028 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 5126 + _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 5129 + _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 5262 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 5265 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5396 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5413 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5486 + _globals["_PIPELINEEVENT"]._serialized_start = 5488 + _globals["_PIPELINEEVENT"]._serialized_end = 5604 + _globals["_SOURCECODELOCATION"]._serialized_start = 5607 + _globals["_SOURCECODELOCATION"]._serialized_end = 5848 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5850 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5919 + _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5922 + _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6137 # @@protoc_insertion_point(module_scope)