diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index cfb0c70fd71d..20319695b212 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -47,9 +47,6 @@ def __init__(self, spark: SparkSession, dataflow_graph_id: str) -> None: def register_dataset(self, dataset: Dataset) -> None: if isinstance(dataset, Table): - table_properties = dataset.table_properties - partition_cols = dataset.partition_cols - if isinstance(dataset.schema, str): schema_string = dataset.schema schema_data_type = None @@ -60,7 +57,15 @@ def register_dataset(self, dataset: Dataset) -> None: schema_string = None schema_data_type = None - format = dataset.format + table_details = pb2.PipelineCommand.DefineDataset.TableDetails( + table_properties=dataset.table_properties, + partition_cols=dataset.partition_cols, + format=dataset.format, + # Even though schema_string is not required, the generated Python code seems to + # erroneously think it is required. + schema_string=schema_string, # type: ignore[arg-type] + schema_data_type=schema_data_type, + ) if isinstance(dataset, MaterializedView): dataset_type = pb2.DatasetType.MATERIALIZED_VIEW @@ -72,12 +77,8 @@ def register_dataset(self, dataset: Dataset) -> None: messageParameters={"dataset_type": type(dataset).__name__}, ) elif isinstance(dataset, TemporaryView): - table_properties = None - partition_cols = None - schema_string = None - schema_data_type = None - format = None dataset_type = pb2.DatasetType.TEMPORARY_VIEW + table_details = None else: raise PySparkTypeError( errorClass="UNSUPPORTED_PIPELINES_DATASET_TYPE", @@ -89,15 +90,10 @@ def register_dataset(self, dataset: Dataset) -> None: dataset_name=dataset.name, dataset_type=dataset_type, comment=dataset.comment, - table_properties=table_properties, - partition_cols=partition_cols, - format=format, + table_details=table_details, source_code_location=source_code_location_to_proto(dataset.source_code_location), - # Even though schema_string is not required, the generated Python code seems to - # erroneously think it is required. - schema_string=schema_string, # type: ignore[arg-type] - schema_data_type=schema_data_type, ) + command = pb2.Command() command.pipeline_command.define_dataset.CopyFrom(inner_command) self._client.execute_command(command) @@ -107,11 +103,15 @@ def register_flow(self, flow: Flow) -> None: df = flow.func() relation = cast(ConnectDataFrame, df)._plan.plan(self._client) + relation_flow_details = pb2.PipelineCommand.DefineFlow.WriteRelationFlowDetails( + relation=relation, + ) + inner_command = pb2.PipelineCommand.DefineFlow( dataflow_graph_id=self._dataflow_graph_id, flow_name=flow.name, target_dataset_name=flow.target, - relation=relation, + relation_flow_details=relation_flow_details, sql_conf=flow.spark_conf, source_code_location=source_code_location_to_proto(flow.source_code_location), ) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index f53f91452454..87eb60007b53 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -34,6 +34,7 @@ _sym_db = _symbol_database.Default() +from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from pyspark.sql.connect.proto import common_pb2 as spark_dot_connect_dot_common__pb2 from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2 @@ -41,7 +42,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xa6\x1c\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xf9\x05\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x02R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x03R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x43\n\x10schema_data_type\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x08 \x01(\tH\x00R\x0cschemaString\x12\x1b\n\x06\x66ormat\x18\t \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x12X\n\x14source_code_location\x18\n \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x06R\x12sourceCodeLocation\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_formatB\x17\n\x15_source_code_location\x1a\x85\x05\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x06 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x07 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xc0\x1f\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xbb\x07\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x02R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x03R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12`\n\rtable_details\x18\x06 \x01(\x0b\x32\x39.spark.connect.PipelineCommand.DefineDataset.TableDetailsH\x00R\x0ctableDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\x92\x03\n\x0cTableDetails\x12y\n\x10table_properties\x18\x01 \x03(\x0b\x32N.spark.connect.PipelineCommand.DefineDataset.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x1b\n\x06\x66ormat\x18\x03 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x12\x43\n\x10schema_data_type\x18\x04 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x05 \x01(\tH\x00R\x0cschemaString\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xdd\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -56,52 +57,64 @@ ]._serialized_options = b"\n\036org.apache.spark.connect.protoP\001Z\022internal/generated" _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._loaded_options = None - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._loaded_options = None + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 4939 - _globals["_DATASETTYPE"]._serialized_end = 5036 - _globals["_PIPELINECOMMAND"]._serialized_start = 168 - _globals["_PIPELINECOMMAND"]._serialized_end = 3790 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1050 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1358 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1259 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 1317 - _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1360 - _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1450 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1453 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2214 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 2034 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 2100 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2217 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2862 - _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1259 - _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1317 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 2692 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 2750 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2865 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3187 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3190 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3389 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3392 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3550 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3553 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 3774 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 3793 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4549 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4165 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 4263 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 4266 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4400 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4403 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4534 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4551 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 4624 - _globals["_PIPELINEEVENT"]._serialized_start = 4626 - _globals["_PIPELINEEVENT"]._serialized_end = 4742 - _globals["_SOURCECODELOCATION"]._serialized_start = 4744 - _globals["_SOURCECODELOCATION"]._serialized_end = 4866 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 4868 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 4937 + _globals["_DATASETTYPE"]._serialized_start = 5376 + _globals["_DATASETTYPE"]._serialized_end = 5473 + _globals["_PIPELINECOMMAND"]._serialized_start = 195 + _globals["_PIPELINECOMMAND"]._serialized_end = 4227 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1077 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1385 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1286 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 1344 + _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1387 + _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1477 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1480 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2435 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS"]._serialized_start = 1929 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS"]._serialized_end = 2331 + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_start = 2244 + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_end = 2310 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2438 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3299 + _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1286 + _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1344 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 3032 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3129 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3131 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3189 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3302 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3624 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3627 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3826 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3829 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3987 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3990 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4211 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4230 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4986 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4602 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 4700 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 4703 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4837 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4840 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4971 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4988 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5061 + _globals["_PIPELINEEVENT"]._serialized_start = 5063 + _globals["_PIPELINEEVENT"]._serialized_end = 5179 + _globals["_SOURCECODELOCATION"]._serialized_start = 5181 + _globals["_SOURCECODELOCATION"]._serialized_end = 5303 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5305 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5374 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index c42b3eefe2d4..979546b1657b 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -35,6 +35,7 @@ limitations under the License. """ import builtins import collections.abc +import google.protobuf.any_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper @@ -208,33 +209,107 @@ class PipelineCommand(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - class TablePropertiesEntry(google.protobuf.message.Message): + class TableDetails(google.protobuf.message.Message): + """Dataset metadata that's only applicable to tables and materialized views.""" + DESCRIPTOR: google.protobuf.descriptor.Descriptor - KEY_FIELD_NUMBER: builtins.int - VALUE_FIELD_NUMBER: builtins.int - key: builtins.str - value: builtins.str + class TablePropertiesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + ) -> None: ... + + TABLE_PROPERTIES_FIELD_NUMBER: builtins.int + PARTITION_COLS_FIELD_NUMBER: builtins.int + FORMAT_FIELD_NUMBER: builtins.int + SCHEMA_DATA_TYPE_FIELD_NUMBER: builtins.int + SCHEMA_STRING_FIELD_NUMBER: builtins.int + @property + def table_properties( + self, + ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Optional table properties.""" + @property + def partition_cols( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Optional partition columns for the table.""" + format: builtins.str + """The output table format of the dataset.""" + @property + def schema_data_type(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ... + schema_string: builtins.str def __init__( self, *, - key: builtins.str = ..., - value: builtins.str = ..., + table_properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + partition_cols: collections.abc.Iterable[builtins.str] | None = ..., + format: builtins.str | None = ..., + schema_data_type: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., + schema_string: builtins.str = ..., ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_format", + b"_format", + "format", + b"format", + "schema", + b"schema", + "schema_data_type", + b"schema_data_type", + "schema_string", + b"schema_string", + ], + ) -> builtins.bool: ... def ClearField( - self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + self, + field_name: typing_extensions.Literal[ + "_format", + b"_format", + "format", + b"format", + "partition_cols", + b"partition_cols", + "schema", + b"schema", + "schema_data_type", + b"schema_data_type", + "schema_string", + b"schema_string", + "table_properties", + b"table_properties", + ], ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_format", b"_format"] + ) -> typing_extensions.Literal["format"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["schema", b"schema"] + ) -> typing_extensions.Literal["schema_data_type", "schema_string"] | None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int DATASET_NAME_FIELD_NUMBER: builtins.int DATASET_TYPE_FIELD_NUMBER: builtins.int COMMENT_FIELD_NUMBER: builtins.int - TABLE_PROPERTIES_FIELD_NUMBER: builtins.int - PARTITION_COLS_FIELD_NUMBER: builtins.int - SCHEMA_DATA_TYPE_FIELD_NUMBER: builtins.int - SCHEMA_STRING_FIELD_NUMBER: builtins.int - FORMAT_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int + TABLE_DETAILS_FIELD_NUMBER: builtins.int + EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this dataset to.""" dataset_name: builtins.str @@ -244,27 +319,12 @@ class PipelineCommand(google.protobuf.message.Message): comment: builtins.str """Optional comment for the dataset.""" @property - def table_properties( - self, - ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: - """Optional table properties. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW.""" - @property - def partition_cols( - self, - ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: - """Optional partition columns for the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ - @property - def schema_data_type(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ... - schema_string: builtins.str - format: builtins.str - """The output table format of the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ - @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this dataset was defined.""" + @property + def table_details(self) -> global___PipelineCommand.DefineDataset.TableDetails: ... + @property + def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, *, @@ -272,12 +332,9 @@ class PipelineCommand(google.protobuf.message.Message): dataset_name: builtins.str | None = ..., dataset_type: global___DatasetType.ValueType | None = ..., comment: builtins.str | None = ..., - table_properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., - partition_cols: collections.abc.Iterable[builtins.str] | None = ..., - schema_data_type: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., - schema_string: builtins.str = ..., - format: builtins.str | None = ..., source_code_location: global___SourceCodeLocation | None = ..., + table_details: global___PipelineCommand.DefineDataset.TableDetails | None = ..., + extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( self, @@ -290,8 +347,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataset_name", "_dataset_type", b"_dataset_type", - "_format", - b"_format", "_source_code_location", b"_source_code_location", "comment", @@ -302,16 +357,14 @@ class PipelineCommand(google.protobuf.message.Message): b"dataset_name", "dataset_type", b"dataset_type", - "format", - b"format", - "schema", - b"schema", - "schema_data_type", - b"schema_data_type", - "schema_string", - b"schema_string", + "details", + b"details", + "extension", + b"extension", "source_code_location", b"source_code_location", + "table_details", + b"table_details", ], ) -> builtins.bool: ... def ClearField( @@ -325,8 +378,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataset_name", "_dataset_type", b"_dataset_type", - "_format", - b"_format", "_source_code_location", b"_source_code_location", "comment", @@ -337,20 +388,14 @@ class PipelineCommand(google.protobuf.message.Message): b"dataset_name", "dataset_type", b"dataset_type", - "format", - b"format", - "partition_cols", - b"partition_cols", - "schema", - b"schema", - "schema_data_type", - b"schema_data_type", - "schema_string", - b"schema_string", + "details", + b"details", + "extension", + b"extension", "source_code_location", b"source_code_location", - "table_properties", - b"table_properties", + "table_details", + b"table_details", ], ) -> None: ... @typing.overload @@ -371,10 +416,6 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_dataset_type", b"_dataset_type"] ) -> typing_extensions.Literal["dataset_type"] | None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_format", b"_format"] - ) -> typing_extensions.Literal["format"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal[ @@ -383,8 +424,8 @@ class PipelineCommand(google.protobuf.message.Message): ) -> typing_extensions.Literal["source_code_location"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["schema", b"schema"] - ) -> typing_extensions.Literal["schema_data_type", "schema_string"] | None: ... + self, oneof_group: typing_extensions.Literal["details", b"details"] + ) -> typing_extensions.Literal["table_details", "extension"] | None: ... class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" @@ -408,6 +449,38 @@ class PipelineCommand(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] ) -> None: ... + class WriteRelationFlowDetails(google.protobuf.message.Message): + """A flow that is that takes the contents of a relation and writes it to the target dataset.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RELATION_FIELD_NUMBER: builtins.int + @property + def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: + """An unresolved relation that defines the dataset's flow. Empty if the query function + that defines the flow cannot be analyzed at the time of flow definition. + """ + def __init__( + self, + *, + relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_relation", b"_relation", "relation", b"relation" + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_relation", b"_relation", "relation", b"relation" + ], + ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] + ) -> typing_extensions.Literal["relation"] | None: ... + class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -438,10 +511,11 @@ class PipelineCommand(google.protobuf.message.Message): DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int FLOW_NAME_FIELD_NUMBER: builtins.int TARGET_DATASET_NAME_FIELD_NUMBER: builtins.int - RELATION_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int CLIENT_ID_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int + RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int + EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -449,11 +523,6 @@ class PipelineCommand(google.protobuf.message.Message): target_dataset_name: builtins.str """Name of the dataset this flow writes to. Can be partially or fully qualified.""" @property - def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: - """An unresolved relation that defines the dataset's flow. Empty if the query function - that defines the flow cannot be analyzed at the time of flow definition. - """ - @property def sql_conf( self, ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: @@ -465,16 +534,24 @@ class PipelineCommand(google.protobuf.message.Message): @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this flow was defined.""" + @property + def relation_flow_details( + self, + ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ... + @property + def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, *, dataflow_graph_id: builtins.str | None = ..., flow_name: builtins.str | None = ..., target_dataset_name: builtins.str | None = ..., - relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., client_id: builtins.str | None = ..., source_code_location: global___SourceCodeLocation | None = ..., + relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails + | None = ..., + extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( self, @@ -485,8 +562,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_relation", - b"_relation", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -495,10 +570,14 @@ class PipelineCommand(google.protobuf.message.Message): b"client_id", "dataflow_graph_id", b"dataflow_graph_id", + "details", + b"details", + "extension", + b"extension", "flow_name", b"flow_name", - "relation", - b"relation", + "relation_flow_details", + b"relation_flow_details", "source_code_location", b"source_code_location", "target_dataset_name", @@ -514,8 +593,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_relation", - b"_relation", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -524,10 +601,14 @@ class PipelineCommand(google.protobuf.message.Message): b"client_id", "dataflow_graph_id", b"dataflow_graph_id", + "details", + b"details", + "extension", + b"extension", "flow_name", b"flow_name", - "relation", - b"relation", + "relation_flow_details", + b"relation_flow_details", "source_code_location", b"source_code_location", "sql_conf", @@ -550,10 +631,6 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] ) -> typing_extensions.Literal["flow_name"] | None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] - ) -> typing_extensions.Literal["relation"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal[ @@ -565,6 +642,10 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], ) -> typing_extensions.Literal["target_dataset_name"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["details", b"details"] + ) -> typing_extensions.Literal["relation_flow_details", "extension"] | None: ... class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index ef24899a5f74..2ef53cf64051 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -19,6 +19,7 @@ syntax = "proto3"; package spark.connect; +import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; import "spark/connect/common.proto"; import "spark/connect/relations.proto"; @@ -73,25 +74,31 @@ message PipelineCommand { // Optional comment for the dataset. optional string comment = 4; - // Optional table properties. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW. - map table_properties = 5; - - // Optional partition columns for the dataset. Only applies to dataset_type == TABLE and - // dataset_type == MATERIALIZED_VIEW. - repeated string partition_cols = 6; + // The location in source code that this dataset was defined. + optional SourceCodeLocation source_code_location = 5; - // Schema for the dataset. If unset, this will be inferred from incoming flows. - oneof schema { - spark.connect.DataType schema_data_type = 7; - string schema_string = 8; + oneof details { + TableDetails table_details = 6; + google.protobuf.Any extension = 999; } - // The output table format of the dataset. Only applies to dataset_type == TABLE and - // dataset_type == MATERIALIZED_VIEW. - optional string format = 9; + // Dataset metadata that's only applicable to tables and materialized views. + message TableDetails { + // Optional table properties. + map table_properties = 1; - // The location in source code that this dataset was defined. - optional SourceCodeLocation source_code_location = 10; + // Optional partition columns for the table. + repeated string partition_cols = 2; + + // The output table format of the dataset. + optional string format = 3; + + // Schema for the dataset. If unset, this will be inferred from incoming flows. + oneof schema { + spark.connect.DataType schema_data_type = 4; + string schema_string = 5; + } + } } // Request to define a flow targeting a dataset. @@ -105,19 +112,27 @@ message PipelineCommand { // Name of the dataset this flow writes to. Can be partially or fully qualified. optional string target_dataset_name = 3; - // An unresolved relation that defines the dataset's flow. Empty if the query function - // that defines the flow cannot be analyzed at the time of flow definition. - optional spark.connect.Relation relation = 4; - // SQL configurations set when running this flow. - map sql_conf = 5; + map sql_conf = 4; // Identifier for the client making the request. The server uses this to determine what flow // evaluation request stream to dispatch evaluation requests to for this flow. - optional string client_id = 6; + optional string client_id = 5; // The location in source code that this flow was defined. - optional SourceCodeLocation source_code_location = 7; + optional SourceCodeLocation source_code_location = 6; + + oneof details { + WriteRelationFlowDetails relation_flow_details = 7; + google.protobuf.Any extension = 999; + } + + // A flow that is that takes the contents of a relation and writes it to the target dataset. + message WriteRelationFlowDetails { + // An unresolved relation that defines the dataset's flow. Empty if the query function + // that defines the flow cannot be analyzed at the time of flow definition. + optional spark.connect.Relation relation = 1; + } message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 358821289c85..3474aecae097 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -184,24 +184,26 @@ private[connect] object PipelinesHandler extends Logging { currentCatalog = Some(graphElementRegistry.defaultCatalog), currentDatabase = Some(graphElementRegistry.defaultDatabase)) .identifier + + val tableDetails = dataset.getTableDetails graphElementRegistry.registerTable( Table( identifier = qualifiedIdentifier, comment = Option(dataset.getComment), - specifiedSchema = dataset.getSchemaCase match { - case proto.PipelineCommand.DefineDataset.SchemaCase.SCHEMA_DATA_TYPE => + specifiedSchema = tableDetails.getSchemaCase match { + case proto.PipelineCommand.DefineDataset.TableDetails.SchemaCase.SCHEMA_DATA_TYPE => Some( DataTypeProtoConverter - .toCatalystType(dataset.getSchemaDataType) + .toCatalystType(tableDetails.getSchemaDataType) .asInstanceOf[StructType]) - case proto.PipelineCommand.DefineDataset.SchemaCase.SCHEMA_STRING => - Some(StructType.fromDDL(dataset.getSchemaString)) - case proto.PipelineCommand.DefineDataset.SchemaCase.SCHEMA_NOT_SET => + case proto.PipelineCommand.DefineDataset.TableDetails.SchemaCase.SCHEMA_STRING => + Some(StructType.fromDDL(tableDetails.getSchemaString)) + case proto.PipelineCommand.DefineDataset.TableDetails.SchemaCase.SCHEMA_NOT_SET => None }, - partitionCols = Option(dataset.getPartitionColsList.asScala.toSeq) + partitionCols = Option(tableDetails.getPartitionColsList.asScala.toSeq) .filter(_.nonEmpty), - properties = dataset.getTablePropertiesMap.asScala.toMap, + properties = tableDetails.getTablePropertiesMap.asScala.toMap, origin = QueryOrigin( filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( dataset.getSourceCodeLocation.getFileName), @@ -210,7 +212,7 @@ private[connect] object PipelinesHandler extends Logging { objectType = Option(QueryOriginType.Table.toString), objectName = Option(qualifiedIdentifier.unquotedString), language = Option(Python())), - format = Option.when(dataset.hasFormat)(dataset.getFormat), + format = Option.when(tableDetails.hasFormat)(tableDetails.getFormat), normalizedPath = None, isStreamingTable = dataset.getDatasetType == proto.DatasetType.TABLE)) qualifiedIdentifier @@ -286,12 +288,13 @@ private[connect] object PipelinesHandler extends Logging { } } + val relationFlowDetails = flow.getRelationFlowDetails graphElementRegistry.registerFlow( new UnresolvedFlow( identifier = flowIdentifier, destinationIdentifier = destinationIdentifier, - func = - FlowAnalysis.createFlowFunctionFromLogicalPlan(transformRelationFunc(flow.getRelation)), + func = FlowAnalysis.createFlowFunctionFromLogicalPlan( + transformRelationFunc(relationFlowDetails.getRelation)), sqlConf = flow.getSqlConfMap.asScala.toMap, once = false, queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 772b43656b14..0a1a74fabb49 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -97,19 +97,23 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName("mv") .setTargetDatasetName("mv") - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression + .setUnresolvedTableValuedFunction(UnresolvedTableValuedFunction .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) .build()) .build()) - .build())) + .build()) + .build()) .build())) registerGraphElementsFromSql( graphId = graphId, @@ -698,17 +702,21 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName(testCase.flowName) .setTargetDatasetName(testCase.flowName) - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression - .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) - .build()) + .setUnresolvedTableValuedFunction( + UnresolvedTableValuedFunction + .newBuilder() + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) + .build()) .build()) .build()) .build() @@ -762,17 +770,21 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName(testCase.flowName) .setTargetDatasetName(testCase.flowName) - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression - .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) - .build()) + .setUnresolvedTableValuedFunction( + UnresolvedTableValuedFunction + .newBuilder() + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) + .build()) .build()) .build()) .build() diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala index c31aec0b7a5e..36b90016a52a 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala @@ -42,23 +42,33 @@ class TestPipelineDefinition(graphId: String) { // specifiedSchema: Option[StructType] = None, partitionCols: Option[Seq[String]] = None, properties: Map[String, String] = Map.empty): Unit = { + val tableDetails = sc.PipelineCommand.DefineDataset.TableDetails + .newBuilder() + .addAllPartitionCols(partitionCols.getOrElse(Seq()).asJava) + .putAllTableProperties(properties.asJava) + .build() + tableDefs += sc.PipelineCommand.DefineDataset .newBuilder() .setDataflowGraphId(graphId) .setDatasetName(name) .setDatasetType(datasetType) .setComment(comment.getOrElse("")) - .addAllPartitionCols(partitionCols.getOrElse(Seq()).asJava) - .putAllTableProperties(properties.asJava) + .setTableDetails(tableDetails) .build() query.foreach { q => + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(q) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(name) - .setRelation(q) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() } @@ -92,12 +102,17 @@ class TestPipelineDefinition(graphId: String) { .setComment(comment.getOrElse("")) .build() + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(query) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(name) - .setRelation(query) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() @@ -118,12 +133,17 @@ class TestPipelineDefinition(graphId: String) { query: sc.Relation, sparkConf: Map[String, String] = Map.empty, once: Boolean = false): Unit = { + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(query) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(destinationName) - .setRelation(query) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() }