diff --git a/python/pyspark/pipelines/source_code_location.py b/python/pyspark/pipelines/source_code_location.py index 5f23b819abd8..cbf4cbe514a6 100644 --- a/python/pyspark/pipelines/source_code_location.py +++ b/python/pyspark/pipelines/source_code_location.py @@ -30,6 +30,34 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation: """ Returns a SourceCodeLocation object representing the location code that invokes this function. + If this function is called from a decorator (ex. @sdp.table), note that the returned line + number is affected by how the decorator was triggered - i.e. whether @sdp.table or @sdp.table() + was called - AND what python version is being used + + Case 1: + |@sdp.table() + |def fn + + @sdp.table() is executed immediately, on line 1. This is true for all python versions. + + Case 2: + |@sdp.table + |def fn + + In python < 3.10, @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is + defined on. This would be line 2. More interestingly, this means: + + |@sdp.table + | + | + |def fn + + Will expand to fn = sdp.table(fn) on line 4, where `fn` is defined. + + However, in python 3.10+, the line number in the stack trace will still be the line that the + decorator was defined on. In other words, case 2 will be treated the same as case 1, and the + line number will be 1. + :param stacklevel: The number of stack frames to go up. 0 means the direct caller of this function, 1 means the caller of the caller, and so on. """ diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 020c7989138d..8faf7eb9ef58 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -29,6 +29,7 @@ ) from pyspark.pipelines.flow import Flow from pyspark.pipelines.graph_element_registry import GraphElementRegistry +from pyspark.pipelines.source_code_location import SourceCodeLocation from typing import Any, cast import pyspark.sql.connect.proto as pb2 @@ -79,6 +80,7 @@ def register_dataset(self, dataset: Dataset) -> None: partition_cols=partition_cols, schema=schema, format=format, + source_code_location=source_code_location_to_proto(dataset.source_code_location), ) command = pb2.Command() command.pipeline_command.define_dataset.CopyFrom(inner_command) @@ -95,6 +97,7 @@ def register_flow(self, flow: Flow) -> None: target_dataset_name=flow.target, relation=relation, sql_conf=flow.spark_conf, + source_code_location=source_code_location_to_proto(flow.source_code_location), ) command = pb2.Command() command.pipeline_command.define_flow.CopyFrom(inner_command) @@ -109,3 +112,11 @@ def register_sql(self, sql_text: str, file_path: Path) -> None: command = pb2.Command() command.pipeline_command.define_sql_graph_elements.CopyFrom(inner_command) self._client.execute_command(command) + + +def source_code_location_to_proto( + source_code_location: SourceCodeLocation, +) -> pb2.SourceCodeLocation: + return pb2.SourceCodeLocation( + file_name=source_code_location.filename, line_number=source_code_location.line_number + ) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index 8f4ba32baccb..849d141f9c49 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -41,7 +41,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe0\x19\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\x92\x04\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x06 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x0c\n\n_client_id\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xc6\x1b\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc4\x05\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x12X\n\x14source_code_location\x18\t \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x06R\x12sourceCodeLocation\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_formatB\x17\n\x15_source_code_location\x1a\x85\x05\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x06 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x07 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -60,10 +60,10 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 4489 - _globals["_DATASETTYPE"]._serialized_end = 4586 + _globals["_DATASETTYPE"]._serialized_start = 4843 + _globals["_DATASETTYPE"]._serialized_end = 4940 _globals["_PIPELINECOMMAND"]._serialized_start = 168 - _globals["_PIPELINECOMMAND"]._serialized_end = 3464 + _globals["_PIPELINECOMMAND"]._serialized_end = 3694 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1050 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1358 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1259 @@ -71,35 +71,37 @@ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1360 _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1450 _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1453 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2046 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1890 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1956 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2049 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2579 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2161 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1980 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 2046 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2164 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2809 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1259 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1317 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 2434 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 2492 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2582 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2861 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2864 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3063 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3066 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3224 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3227 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 3448 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 3467 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4223 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 3839 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 3937 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 3940 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4074 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4077 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4208 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4225 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 4298 - _globals["_PIPELINEEVENT"]._serialized_start = 4300 - _globals["_PIPELINEEVENT"]._serialized_end = 4416 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 4418 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 4487 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 2639 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 2697 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2812 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3091 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3094 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3293 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3296 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3454 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3457 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 3678 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 3697 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4453 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4069 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 4167 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 4170 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4304 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4307 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4438 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4455 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 4528 + _globals["_PIPELINEEVENT"]._serialized_start = 4530 + _globals["_PIPELINEEVENT"]._serialized_end = 4646 + _globals["_SOURCECODELOCATION"]._serialized_start = 4648 + _globals["_SOURCECODELOCATION"]._serialized_end = 4770 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 4772 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 4841 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 097614de8a1f..b5ed1c216a83 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -233,6 +233,7 @@ class PipelineCommand(google.protobuf.message.Message): PARTITION_COLS_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int FORMAT_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this dataset to.""" dataset_name: builtins.str @@ -260,6 +261,9 @@ class PipelineCommand(google.protobuf.message.Message): """The output table format of the dataset. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW. """ + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this dataset was defined.""" def __init__( self, *, @@ -271,6 +275,7 @@ class PipelineCommand(google.protobuf.message.Message): partition_cols: collections.abc.Iterable[builtins.str] | None = ..., schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., format: builtins.str | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, @@ -287,6 +292,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -299,6 +306,8 @@ class PipelineCommand(google.protobuf.message.Message): b"format", "schema", b"schema", + "source_code_location", + b"source_code_location", ], ) -> builtins.bool: ... def ClearField( @@ -316,6 +325,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -330,6 +341,8 @@ class PipelineCommand(google.protobuf.message.Message): b"partition_cols", "schema", b"schema", + "source_code_location", + b"source_code_location", "table_properties", b"table_properties", ], @@ -359,6 +372,13 @@ class PipelineCommand(google.protobuf.message.Message): def WhichOneof( self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] ) -> typing_extensions.Literal["schema"] | None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" @@ -415,6 +435,7 @@ class PipelineCommand(google.protobuf.message.Message): RELATION_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int CLIENT_ID_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -435,6 +456,9 @@ class PipelineCommand(google.protobuf.message.Message): """Identifier for the client making the request. The server uses this to determine what flow evaluation request stream to dispatch evaluation requests to for this flow. """ + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this flow was defined.""" def __init__( self, *, @@ -444,6 +468,7 @@ class PipelineCommand(google.protobuf.message.Message): relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., client_id: builtins.str | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, @@ -456,6 +481,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_flow_name", "_relation", b"_relation", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "client_id", @@ -466,6 +493,8 @@ class PipelineCommand(google.protobuf.message.Message): b"flow_name", "relation", b"relation", + "source_code_location", + b"source_code_location", "target_dataset_name", b"target_dataset_name", ], @@ -481,6 +510,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_flow_name", "_relation", b"_relation", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "client_id", @@ -491,6 +522,8 @@ class PipelineCommand(google.protobuf.message.Message): b"flow_name", "relation", b"relation", + "source_code_location", + b"source_code_location", "sql_conf", b"sql_conf", "target_dataset_name", @@ -515,6 +548,13 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] ) -> typing_extensions.Literal["relation"] | None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], @@ -1134,6 +1174,60 @@ class PipelineEvent(google.protobuf.message.Message): global___PipelineEvent = PipelineEvent +class SourceCodeLocation(google.protobuf.message.Message): + """Source code location information associated with a particular dataset or flow.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + FILE_NAME_FIELD_NUMBER: builtins.int + LINE_NUMBER_FIELD_NUMBER: builtins.int + file_name: builtins.str + """The file that this pipeline source code was defined in.""" + line_number: builtins.int + """The specific line number that this pipeline source code is located at, if applicable.""" + def __init__( + self, + *, + file_name: builtins.str | None = ..., + line_number: builtins.int | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"] + ) -> typing_extensions.Literal["file_name"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_line_number", b"_line_number"] + ) -> typing_extensions.Literal["line_number"] | None: ... + +global___SourceCodeLocation = SourceCodeLocation + class PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message): """A signal from the server to the client to execute the query function for one or more flows, and to register their results with the server. diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index ef1ac5f38073..16d211f9f72d 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -86,6 +86,9 @@ message PipelineCommand { // The output table format of the dataset. Only applies to dataset_type == TABLE and // dataset_type == MATERIALIZED_VIEW. optional string format = 8; + + // The location in source code that this dataset was defined. + optional SourceCodeLocation source_code_location = 9; } // Request to define a flow targeting a dataset. @@ -110,6 +113,9 @@ message PipelineCommand { // evaluation request stream to dispatch evaluation requests to for this flow. optional string client_id = 6; + // The location in source code that this flow was defined. + optional SourceCodeLocation source_code_location = 7; + message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. optional string flow_name = 1; @@ -217,6 +223,14 @@ message PipelineEvent { optional string message = 2; } +// Source code location information associated with a particular dataset or flow. +message SourceCodeLocation { + // The file that this pipeline source code was defined in. + optional string file_name = 1; + // The specific line number that this pipeline source code is located at, if applicable. + optional int32 line_number = 2; +} + // A signal from the server to the client to execute the query function for one or more flows, and // to register their results with the server. message PipelineQueryFunctionExecutionSignal { diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 1b2e039be715..01402c64e8a2 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -195,7 +195,11 @@ private[connect] object PipelinesHandler extends Logging { partitionCols = Option(dataset.getPartitionColsList.asScala.toSeq) .filter(_.nonEmpty), properties = dataset.getTablePropertiesMap.asScala.toMap, - baseOrigin = QueryOrigin( + origin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Table.toString), objectName = Option(qualifiedIdentifier.unquotedString), language = Option(Python())), @@ -212,6 +216,10 @@ private[connect] object PipelinesHandler extends Logging { identifier = viewIdentifier, comment = Option(dataset.getComment), origin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.View.toString), objectName = Option(viewIdentifier.unquotedString), language = Option(Python())), @@ -281,6 +289,10 @@ private[connect] object PipelinesHandler extends Logging { once = false, queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), origin = QueryOrigin( + filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( + flow.getSourceCodeLocation.getFileName), + line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( + flow.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Flow.toString), objectName = Option(flowIdentifier.unquotedString), language = Option(Python())))) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 897c0209153f..b99615062d45 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -31,7 +31,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.connect.service.SparkConnectService import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} -import org.apache.spark.sql.pipelines.graph.{DataflowGraph, PipelineUpdateContextImpl} +import org.apache.spark.sql.pipelines.Language.Python +import org.apache.spark.sql.pipelines.common.FlowStatus +import org.apache.spark.sql.pipelines.graph.{DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType} +import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} /** @@ -116,6 +119,132 @@ class PythonPipelineSuite assert(graph.tables.size == 1) } + test("failed flow progress event has correct python source code location") { + // Note that pythonText will be inserted into line 26 of the python script that is run. + val unresolvedGraph = buildGraph(pythonText = """ + |@dp.table() + |def table1(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("name") + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = FlowStatus.FAILED, + cond = flowProgressEvent => + flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(28), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString))), + errorChecker = ex => + ex.getMessage.contains( + "A column, variable, or function parameter with name `name` cannot be resolved."), + expectedEventLevel = EventLevel.WARN) + } + + test("flow progress events have correct python source code location") { + val unresolvedGraph = buildGraph(pythonText = """ + |@dp.table( + | comment = 'my table' + |) + |def table1(): + | return spark.readStream.table('mv') + | + |@dp.materialized_view + |def mv2(): + | return spark.range(26, 29) + | + |@dp.materialized_view + |def mv(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("age") + | + |@dp.append_flow( + | target = 'table1' + |) + |def standalone_flow1(): + | return spark.readStream.table('mv2') + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + Seq( + FlowStatus.QUEUED, + FlowStatus.STARTING, + FlowStatus.PLANNING, + FlowStatus.RUNNING, + FlowStatus.COMPLETED).foreach { flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv2"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => + flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(34), + objectName = Option("spark_catalog.default.mv2"), + objectType = Option(QueryOriginType.Flow.toString))), + expectedEventLevel = EventLevel.INFO) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => + flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(38), + objectName = Option("spark_catalog.default.mv"), + objectType = Option(QueryOriginType.Flow.toString))), + expectedEventLevel = EventLevel.INFO) + } + + // Note that streaming flows do not have a PLANNING phase. + Seq(FlowStatus.QUEUED, FlowStatus.STARTING, FlowStatus.RUNNING, FlowStatus.COMPLETED) + .foreach { flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => + flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(28), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString))), + expectedEventLevel = EventLevel.INFO) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("standalone_flow1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => + flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(43), + objectName = Option("spark_catalog.default.standalone_flow1"), + objectType = Option(QueryOriginType.Flow.toString))), + expectedEventLevel = EventLevel.INFO) + } + } + test("basic with inverted topological order") { // This graph is purposefully in the wrong topological order to test the topological sort val graph = buildGraph(""" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala index fcab53ae32ac..6d12e2281874 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala @@ -123,7 +123,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = flowToResolve.sqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) val result = flowFunctionResult match { @@ -169,7 +170,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = newSqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) } else { f diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 40fb8dbbe5dc..91feee936170 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -72,13 +72,16 @@ trait FlowFunction extends Logging { * @param availableInputs the list of all [[Input]]s available to this flow * @param configuration the spark configurations that apply to this flow. * @param queryContext The context of the query being evaluated. + * @param queryOrigin The source code location of the flow definition this flow function was + * instantiated from. * @return the inputs actually used, and the DataFrame expression for the flow */ def call( allInputs: Set[TableIdentifier], availableInputs: Seq[Input], configuration: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala index 311bdfd6a3d2..18ae45c4f340 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala @@ -46,7 +46,8 @@ object FlowAnalysis { allInputs: Set[TableIdentifier], availableInputs: Seq[Input], confs: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult = { val ctx = FlowAnalysisContext( allInputs = allInputs, diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala index eb6b20b8a4ef..55a03a2d19f9 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala @@ -193,7 +193,7 @@ class SqlGraphRegistrationContext( Option.when(cst.columns.nonEmpty)(StructType(cst.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cst.partitioning, queryOrigin)), properties = cst.tableSpec.properties, - baseOrigin = queryOrigin.copy( + origin = queryOrigin.copy( objectName = Option(stIdentifier.unquotedString), objectType = Option(QueryOriginType.Table.toString) ), @@ -224,7 +224,7 @@ class SqlGraphRegistrationContext( Option.when(cst.columns.nonEmpty)(StructType(cst.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cst.partitioning, queryOrigin)), properties = cst.tableSpec.properties, - baseOrigin = queryOrigin.copy( + origin = queryOrigin.copy( objectName = Option(stIdentifier.unquotedString), objectType = Option(QueryOriginType.Table.toString) ), @@ -274,7 +274,7 @@ class SqlGraphRegistrationContext( Option.when(cmv.columns.nonEmpty)(StructType(cmv.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cmv.partitioning, queryOrigin)), properties = cmv.tableSpec.properties, - baseOrigin = queryOrigin.copy( + origin = queryOrigin.copy( objectName = Option(mvIdentifier.unquotedString), objectType = Option(QueryOriginType.Table.toString) ), diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala index ee78f96d5316..95a57dcc4495 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala @@ -123,17 +123,12 @@ case class Table( normalizedPath: Option[String], properties: Map[String, String] = Map.empty, comment: Option[String], - baseOrigin: QueryOrigin, + override val origin: QueryOrigin, isStreamingTable: Boolean, format: Option[String] ) extends TableInput with Output { - override val origin: QueryOrigin = baseOrigin.copy( - objectType = Some("table"), - objectName = Some(identifier.unquotedString) - ) - // Load this table's data from underlying storage. override def load(readOptions: InputReadOptions): DataFrame = { try { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala index 38bd858a688f..4a33dd2c61a8 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.pipelines.graph.{ PersistedView, QueryContext, QueryOrigin, + QueryOriginType, Table, TemporaryView, UnresolvedFlow @@ -129,7 +130,6 @@ class TestGraphRegistrationContext( isStreamingTable: Boolean ): Unit = { // scalastyle:on - val tableIdentifier = GraphIdentifierManager.parseTableIdentifier(name, spark) val qualifiedIdentifier = GraphIdentifierManager .parseAndQualifyTableIdentifier( rawTableIdentifier = GraphIdentifierManager @@ -144,7 +144,12 @@ class TestGraphRegistrationContext( specifiedSchema = specifiedSchema, partitionCols = partitionCols, properties = properties, - baseOrigin = baseOrigin, + origin = baseOrigin.merge( + QueryOrigin( + objectName = Option(qualifiedIdentifier.unquotedString), + objectType = Option(QueryOriginType.Table.toString) + ) + ), format = format.orElse(Some("parquet")), normalizedPath = None, isStreamingTable = isStreamingTable @@ -215,13 +220,20 @@ class TestGraphRegistrationContext( case _ => persistedViewIdentifier } + val viewOrigin: QueryOrigin = origin.merge( + QueryOrigin( + objectName = Option(viewIdentifier.unquotedString), + objectType = Option(QueryOriginType.View.toString) + ) + ) + registerView( viewType match { case LocalTempView => TemporaryView( identifier = viewIdentifier, comment = comment, - origin = origin, + origin = viewOrigin, properties = Map.empty, sqlText = sqlText ) @@ -229,7 +241,7 @@ class TestGraphRegistrationContext( PersistedView( identifier = viewIdentifier, comment = comment, - origin = origin, + origin = viewOrigin, properties = Map.empty, sqlText = sqlText ) @@ -298,7 +310,10 @@ class TestGraphRegistrationContext( ), sqlConf = Map.empty, once = once, - origin = QueryOrigin() + origin = QueryOrigin( + objectName = Option(flowIdentifier.unquotedString), + objectType = Option(QueryOriginType.Flow.toString) + ) ) ) }