diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py index bcc72a4a04e1..64fb279b8d6c 100644 --- a/python/pyspark/pipelines/api.py +++ b/python/pyspark/pipelines/api.py @@ -36,7 +36,6 @@ def append_flow( target: str, name: Optional[str] = None, spark_conf: Optional[Dict[str, str]] = None, - once: bool = False, ) -> Callable[[QueryFunction], None]: """ Return a decorator on a query function to define a flow in a pipeline. @@ -46,8 +45,6 @@ def append_flow( :param spark_conf: A dict whose keys are the conf names and values are the conf values. \ These confs will be set when the flow is executed; they can override confs set for the \ destination, for the pipeline, or on the cluster. - :param once: If True, indicates this flow should run only once. (It will be rerun upon a full \ - refresh operation.) """ if name is not None and type(name) is not str: raise PySparkTypeError( @@ -67,7 +64,6 @@ def outer(func: QueryFunction) -> None: target=target, spark_conf=spark_conf, source_code_location=source_code_location, - once=once, func=func, ) get_active_graph_element_registry().register_flow(flow) @@ -177,7 +173,6 @@ def outer( target=resolved_name, spark_conf=spark_conf or {}, source_code_location=source_code_location, - once=False, func=decorated, ) ) @@ -280,7 +275,6 @@ def outer( target=resolved_name, spark_conf=spark_conf or {}, source_code_location=source_code_location, - once=False, func=decorated, ) ) @@ -371,7 +365,6 @@ def outer(decorated: QueryFunction) -> None: spark_conf=spark_conf or {}, name=resolved_name, source_code_location=source_code_location, - once=False, ) ) diff --git a/python/pyspark/pipelines/flow.py b/python/pyspark/pipelines/flow.py index c2f8599ebf9f..7c499c0b3622 100644 --- a/python/pyspark/pipelines/flow.py +++ b/python/pyspark/pipelines/flow.py @@ -33,7 +33,6 @@ class Flow: :param spark_conf: A dict where the keys are the Spark configuration property names and the values are the property values. These properties will be set on the flow. :param source_code_location: The location of the source code that created this flow. - :param once: If True, the flow will be executed once per run. :param func: The function that defines the flow. This function should return a DataFrame. """ @@ -41,5 +40,4 @@ class Flow: target: str spark_conf: Dict[str, str] source_code_location: SourceCodeLocation - once: bool func: QueryFunction diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 013d2205741b..020c7989138d 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -95,7 +95,6 @@ def register_flow(self, flow: Flow) -> None: target_dataset_name=flow.target, relation=relation, sql_conf=flow.spark_conf, - once=flow.once, ) command = pb2.Command() command.pipeline_command.define_flow.CopyFrom(inner_command) diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py b/python/pyspark/pipelines/tests/test_graph_element_registry.py index 9ebf29d1ed47..fb8504f4f8ed 100644 --- a/python/pyspark/pipelines/tests/test_graph_element_registry.py +++ b/python/pyspark/pipelines/tests/test_graph_element_registry.py @@ -38,11 +38,11 @@ def st(): sdp.create_streaming_table("st2") - @sdp.append_flow(target="st2", once=True) + @sdp.append_flow(target="st2") def flow1(): raise NotImplementedError() - @sdp.append_flow(target="st2", once=False) + @sdp.append_flow(target="st2") def flow2(): raise NotImplementedError() @@ -74,13 +74,11 @@ def flow2(): st2_flow1_obj = registry.flows[2] self.assertEqual(st2_flow1_obj.name, "flow1") self.assertEqual(st2_flow1_obj.target, "st2") - self.assertEqual(st2_flow1_obj.once, True) assert mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py") st2_flow1_obj = registry.flows[3] self.assertEqual(st2_flow1_obj.name, "flow2") self.assertEqual(st2_flow1_obj.target, "st2") - self.assertEqual(st2_flow1_obj.once, False) assert mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py") def test_definition_without_graph_element_registry(self): diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index 9558696a3964..08b39a39e831 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -40,7 +40,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xb9\x14\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xc8\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x07\n\x05_once\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x97\x14\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xa6\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relation\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -59,10 +59,10 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 3225 - _globals["_DATASETTYPE"]._serialized_end = 3322 + _globals["_DATASETTYPE"]._serialized_start = 3191 + _globals["_DATASETTYPE"]._serialized_end = 3288 _globals["_PIPELINECOMMAND"]._serialized_start = 140 - _globals["_PIPELINECOMMAND"]._serialized_end = 2757 + _globals["_PIPELINECOMMAND"]._serialized_end = 2723 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 719 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1110 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 928 @@ -76,19 +76,19 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1642 _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1708 _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1801 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2257 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2223 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 928 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 986 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2260 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2539 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2542 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2741 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2760 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 3030 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2917 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 3015 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 3032 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 3105 - _globals["_PIPELINEEVENT"]._serialized_start = 3107 - _globals["_PIPELINEEVENT"]._serialized_end = 3223 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2226 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2505 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2508 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2707 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2726 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2996 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2883 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2981 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 2998 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 3071 + _globals["_PIPELINEEVENT"]._serialized_start = 3073 + _globals["_PIPELINEEVENT"]._serialized_end = 3189 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 6cf395a75cdb..6287aabafc6b 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -420,7 +420,6 @@ class PipelineCommand(google.protobuf.message.Message): TARGET_DATASET_NAME_FIELD_NUMBER: builtins.int RELATION_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int - ONCE_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -435,8 +434,6 @@ class PipelineCommand(google.protobuf.message.Message): self, ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: """SQL configurations set when running this flow.""" - once: builtins.bool - """If true, this flow will only be run once per full refresh.""" def __init__( self, *, @@ -445,7 +442,6 @@ class PipelineCommand(google.protobuf.message.Message): target_dataset_name: builtins.str | None = ..., relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., - once: builtins.bool | None = ..., ) -> None: ... def HasField( self, @@ -454,8 +450,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_once", - b"_once", "_relation", b"_relation", "_target_dataset_name", @@ -464,8 +458,6 @@ class PipelineCommand(google.protobuf.message.Message): b"dataflow_graph_id", "flow_name", b"flow_name", - "once", - b"once", "relation", b"relation", "target_dataset_name", @@ -479,8 +471,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_once", - b"_once", "_relation", b"_relation", "_target_dataset_name", @@ -489,8 +479,6 @@ class PipelineCommand(google.protobuf.message.Message): b"dataflow_graph_id", "flow_name", b"flow_name", - "once", - b"once", "relation", b"relation", "sql_conf", @@ -509,10 +497,6 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] ) -> typing_extensions.Literal["flow_name"] | None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_once", b"_once"] - ) -> typing_extensions.Literal["once"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] ) -> typing_extensions.Literal["relation"] | None: ... diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index 18a1170ceebb..57e1ffc7dbe7 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -106,9 +106,6 @@ message PipelineCommand { // SQL configurations set when running this flow. map sql_conf = 5; - - // If true, this flow will only be run once per full refresh. - optional bool once = 6; } // Resolves all datasets and flows and start a pipeline update. Should be called after all diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 21e9c97bfe5a..6f3876dabeae 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -219,7 +219,7 @@ private[connect] object PipelinesHandler extends Logging { func = FlowAnalysis.createFlowFunctionFromLogicalPlan(transformRelationFunc(flow.getRelation)), sqlConf = flow.getSqlConfMap.asScala.toMap, - once = flow.getOnce, + once = false, queryContext = QueryContext( Option(graphElementRegistry.defaultCatalog), Option(graphElementRegistry.defaultDatabase)), diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala index 090caf1bb640..c31aec0b7a5e 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala @@ -60,7 +60,6 @@ class TestPipelineDefinition(graphId: String) { .setTargetDatasetName(name) .setRelation(q) .putAllSqlConf(sparkConf.asJava) - .setOnce(false) .build() } } @@ -100,7 +99,6 @@ class TestPipelineDefinition(graphId: String) { .setTargetDatasetName(name) .setRelation(query) .putAllSqlConf(sparkConf.asJava) - .setOnce(false) .build() } @@ -127,7 +125,6 @@ class TestPipelineDefinition(graphId: String) { .setTargetDatasetName(destinationName) .setRelation(query) .putAllSqlConf(sparkConf.asJava) - .setOnce(once) .build() } }