From f8aec6f210e173e92355f8d688f2aef70523b84c Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 14 Jul 2025 21:25:25 -0700 Subject: [PATCH 1/6] dry run --- ...declarative-pipelines-programming-guide.md | 9 +- python/pyspark/pipelines/cli.py | 17 ++- .../pipelines/spark_connect_pipeline.py | 4 +- .../sql/connect/proto/pipelines_pb2.py | 30 ++--- .../sql/connect/proto/pipelines_pb2.pyi | 19 +++ .../protobuf/spark/connect/pipelines.proto | 4 + .../connect/pipelines/PipelinesHandler.scala | 6 +- .../pipelines/PipelineEventStreamSuite.scala | 121 ++++++++++++++---- .../pipelines/graph/PipelineExecution.scala | 48 ++++--- 9 files changed, 189 insertions(+), 69 deletions(-) diff --git a/docs/declarative-pipelines-programming-guide.md b/docs/declarative-pipelines-programming-guide.md index 929cd07e5daa..5f938f38e1e4 100644 --- a/docs/declarative-pipelines-programming-guide.md +++ b/docs/declarative-pipelines-programming-guide.md @@ -94,7 +94,7 @@ The `spark-pipelines init` command, described below, makes it easy to generate a ## The `spark-pipelines` Command Line Interface -The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project. +The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project and a `dry-run` subcommand for validating a pipeline. `spark-pipelines` is built on top of `spark-submit`, meaning that it supports all cluster managers supported by `spark-submit`. It supports all `spark-submit` arguments except for `--class`. @@ -106,6 +106,13 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute `spark-pipelines run` launches an execution of a pipeline and monitors its progress until it completes. The `--spec` parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named `pipeline.yml` or `pipeline.yaml`. +### `spark-pipelines dry-run` + +`spark-pipelines dry-run` launches an execution of a pipeline that doesn't write or read any data, but catches many kinds of errors that would be caught if the pipeline were to actually run. E.g. +- Syntax errors – e.g. invalid Python or SQL code +- Analysis errors – e.g. selecting from a table that doesn't exist or selecting a column that doesn't exist +- Graph validation errors - e.g. cyclic dependencies + ## Programming with SDP in Python SDP Python functions are defined in the `pyspark.pipelines` module. Your pipelines implemented with the Python API must import this module. It's common to alias the module to `sdp` to limit the number of characters you need to type when using its APIs. diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py index 395f7e9b8374..d22c78a7570f 100644 --- a/python/pyspark/pipelines/cli.py +++ b/python/pyspark/pipelines/cli.py @@ -217,7 +217,7 @@ def change_dir(path: Path) -> Generator[None, None, None]: os.chdir(prev) -def run(spec_path: Path) -> None: +def run(spec_path: Path, dry: bool) -> None: """Run the pipeline defined with the given spec.""" log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...") spec = load_pipeline_spec(spec_path) @@ -242,7 +242,7 @@ def run(spec_path: Path) -> None: register_definitions(spec_path, registry, spec) log_with_curr_timestamp("Starting run...") - result_iter = start_run(spark, dataflow_graph_id) + result_iter = start_run(spark, dataflow_graph_id, dry=dry) try: handle_pipeline_events(result_iter) finally: @@ -257,6 +257,13 @@ def run(spec_path: Path) -> None: run_parser = subparsers.add_parser("run", help="Run a pipeline.") run_parser.add_argument("--spec", help="Path to the pipeline spec.") + # "dry-run" subcommand + run_parser = subparsers.add_parser( + "dry-run", + help="Launch a run that just validates the graph and checks for errors.", + ) + run_parser.add_argument("--spec", help="Path to the pipeline spec.") + # "init" subcommand init_parser = subparsers.add_parser( "init", @@ -270,9 +277,9 @@ def run(spec_path: Path) -> None: ) args = parser.parse_args() - assert args.command in ["run", "init"] + assert args.command in ["run", "dry-run", "init"] - if args.command == "run": + if args.command in ["run", "dry-run"]: if args.spec is not None: spec_path = Path(args.spec) if not spec_path.is_file(): @@ -283,6 +290,6 @@ def run(spec_path: Path) -> None: else: spec_path = find_pipeline_spec(Path.cwd()) - run(spec_path=spec_path) + run(spec_path=spec_path, dry=(args.command == "dry-run")) elif args.command == "init": init(args.name) diff --git a/python/pyspark/pipelines/spark_connect_pipeline.py b/python/pyspark/pipelines/spark_connect_pipeline.py index 12f43a236c28..4288e2c5d5eb 100644 --- a/python/pyspark/pipelines/spark_connect_pipeline.py +++ b/python/pyspark/pipelines/spark_connect_pipeline.py @@ -65,12 +65,12 @@ def handle_pipeline_events(iter: Iterator[Dict[str, Any]]) -> None: log_with_provided_timestamp(event.message, dt) -def start_run(spark: SparkSession, dataflow_graph_id: str) -> Iterator[Dict[str, Any]]: +def start_run(spark: SparkSession, dataflow_graph_id: str, dry: bool) -> Iterator[Dict[str, Any]]: """Start a run of the dataflow graph in the Spark Connect server. :param dataflow_graph_id: The ID of the dataflow graph to start. """ - inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id) + inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id, dry=dry) command = pb2.Command() command.pipeline_command.start_run.CopyFrom(inner_command) # Cast because mypy seems to think `spark`` is a function, not an object. Likely related to diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index 413e1fbe12b5..7176f10368da 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -40,7 +40,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xf2\x12\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xc8\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x07\n\x05_once\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x91\x13\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xc8\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x07\n\x05_once\x1ap\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x15\n\x03\x64ry\x18\x02 \x01(\x08H\x01R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -59,10 +59,10 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 3026 - _globals["_DATASETTYPE"]._serialized_end = 3123 + _globals["_DATASETTYPE"]._serialized_start = 3057 + _globals["_DATASETTYPE"]._serialized_end = 3154 _globals["_PIPELINECOMMAND"]._serialized_start = 140 - _globals["_PIPELINECOMMAND"]._serialized_end = 2558 + _globals["_PIPELINECOMMAND"]._serialized_end = 2589 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 719 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1110 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 928 @@ -80,15 +80,15 @@ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 928 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 986 _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2259 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2340 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2343 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2542 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2561 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2831 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2718 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2816 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 2833 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 2906 - _globals["_PIPELINEEVENT"]._serialized_start = 2908 - _globals["_PIPELINEEVENT"]._serialized_end = 3024 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2371 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2374 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2573 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2592 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2862 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2749 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2847 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 2864 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 2937 + _globals["_PIPELINEEVENT"]._serialized_start = 2939 + _globals["_PIPELINEEVENT"]._serialized_end = 3055 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 36fb73f06906..243c10c59167 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -530,20 +530,30 @@ class PipelineCommand(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int + DRY_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to start.""" + dry: builtins.bool + """If true, the run will not actually execute any flows, but will only validate the graph and + check for any errors. This is useful for testing and validation purposes. + """ def __init__( self, *, dataflow_graph_id: builtins.str | None = ..., + dry: builtins.bool | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", + "_dry", + b"_dry", "dataflow_graph_id", b"dataflow_graph_id", + "dry", + b"dry", ], ) -> builtins.bool: ... def ClearField( @@ -551,14 +561,23 @@ class PipelineCommand(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", + "_dry", + b"_dry", "dataflow_graph_id", b"dataflow_graph_id", + "dry", + b"dry", ], ) -> None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_dry", b"_dry"] + ) -> typing_extensions.Literal["dry"] | None: ... class DefineSqlGraphElements(google.protobuf.message.Message): """Parses the SQL file and registers all datasets and flows.""" diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c5a631264590..0b8bde131fd4 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -116,6 +116,10 @@ message PipelineCommand { message StartRun { // The graph to start. optional string dataflow_graph_id = 1; + + // If true, the run will not actually execute any flows, but will only validate the graph and + // check for any errors. This is useful for testing and validation purposes. + optional bool dry = 2; } // Parses the SQL file and registers all datasets and flows. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 7bb1d7358557..132eeaf5b465 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -282,7 +282,11 @@ private[connect] object PipelinesHandler extends Logging { val pipelineUpdateContext = new PipelineUpdateContextImpl(graphElementRegistry.toDataflowGraph, eventCallback) sessionHolder.cachePipelineExecution(dataflowGraphId, pipelineUpdateContext) - pipelineUpdateContext.pipelineExecution.runPipeline() + if (cmd.getDry) { + pipelineUpdateContext.pipelineExecution.dryRunPipeline() + } else { + pipelineUpdateContext.pipelineExecution.runPipeline() + } // Rethrow any exceptions that caused the pipeline run to fail so that the exception is // propagated back to the SC client / CLI. diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala index 100aa2e3b63a..4013908a6413 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala @@ -31,18 +31,21 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { createTable( name = "a", datasetType = proto.DatasetType.MATERIALIZED_VIEW, - sql = Some("SELECT * FROM RANGE(5)")) + sql = Some("SELECT * FROM RANGE(5)") + ) createTable( name = "b", datasetType = proto.DatasetType.TABLE, - sql = Some("SELECT * FROM STREAM a")) + sql = Some("SELECT * FROM STREAM a") + ) } registerPipelineDatasets(pipeline) val capturedEvents = new ArrayBuffer[PipelineEvent]() withClient { client => val startRunRequest = buildStartRunPlan( - proto.PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId).build()) + proto.PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId).build() + ) val responseIterator = client.execute(startRunRequest) while (responseIterator.hasNext) { val response = responseIterator.next() @@ -60,57 +63,121 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { "Flow spark_catalog.default.b is STARTING", "Flow spark_catalog.default.b is RUNNING", "Flow spark_catalog.default.b has COMPLETED", - "Run is COMPLETED") + "Run is COMPLETED" + ) expectedEventMessages.foreach { eventMessage => assert( capturedEvents.exists(e => e.getMessage.contains(eventMessage)), - s"Did not receive expected event: $eventMessage") + s"Did not receive expected event: $eventMessage" + ) } } } } - test("check error events from stream") { + test("flow resolution failure") { + val dryOptions = Seq(true, false) + + dryOptions.foreach { dry => + withRawBlockingStub { implicit stub => + val graphId = createDataflowGraph + val pipeline = new TestPipelineDefinition(graphId) { + createTable( + name = "a", + datasetType = proto.DatasetType.MATERIALIZED_VIEW, + sql = Some("SELECT * FROM unknown_table") + ) + createTable( + name = "b", + datasetType = proto.DatasetType.TABLE, + sql = Some("SELECT * FROM STREAM a") + ) + } + registerPipelineDatasets(pipeline) + + val capturedEvents = new ArrayBuffer[PipelineEvent]() + withClient { client => + val startRunRequest = buildStartRunPlan( + proto.PipelineCommand.StartRun + .newBuilder() + .setDataflowGraphId(graphId) + .setDry(dry) + .build() + ) + val ex = intercept[AnalysisException] { + val responseIterator = client.execute(startRunRequest) + while (responseIterator.hasNext) { + val response = responseIterator.next() + if (response.hasPipelineEventResult) { + capturedEvents.append(response.getPipelineEventResult.getEvent) + } + } + } + // (?s) enables wildcard matching on newline characters + val runFailureErrorMsg = "(?s).*Failed to resolve flows in the pipeline.*".r + assert(runFailureErrorMsg.matches(ex.getMessage)) + val expectedLogPatterns = Set( + "(?s).*Failed to resolve flow.*Failed to read dataset 'spark_catalog.default.a'.*".r, + "(?s).*Failed to resolve flow.*[TABLE_OR_VIEW_NOT_FOUND].*".r + ) + expectedLogPatterns.foreach { logPattern => + assert( + capturedEvents.exists(e => logPattern.matches(e.getMessage)), + s"Did not receive expected event matching pattern: $logPattern" + ) + } + // Ensure that the error causing the run failure is not surfaced to the user twice + assert(capturedEvents.forall(e => !runFailureErrorMsg.matches(e.getMessage))) + } + } + } + } + + test("successful dry run") { withRawBlockingStub { implicit stub => val graphId = createDataflowGraph val pipeline = new TestPipelineDefinition(graphId) { createTable( name = "a", datasetType = proto.DatasetType.MATERIALIZED_VIEW, - sql = Some("SELECT * FROM unknown_table")) + sql = Some("SELECT * FROM RANGE(5)") + ) createTable( name = "b", datasetType = proto.DatasetType.TABLE, - sql = Some("SELECT * FROM STREAM a")) + sql = Some("SELECT * FROM STREAM a") + ) } registerPipelineDatasets(pipeline) val capturedEvents = new ArrayBuffer[PipelineEvent]() withClient { client => val startRunRequest = buildStartRunPlan( - proto.PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId).build()) - val ex = intercept[AnalysisException] { - val responseIterator = client.execute(startRunRequest) - while (responseIterator.hasNext) { - val response = responseIterator.next() - if (response.hasPipelineEventResult) { - capturedEvents.append(response.getPipelineEventResult.getEvent) - } + proto.PipelineCommand.StartRun + .newBuilder() + .setDataflowGraphId(graphId) + .setDry(true) + .build() + ) + val responseIterator = client.execute(startRunRequest) + while (responseIterator.hasNext) { + val response = responseIterator.next() + if (response.hasPipelineEventResult) { + capturedEvents.append(response.getPipelineEventResult.getEvent) } } - // (?s) enables wildcard matching on newline characters - val runFailureErrorMsg = "(?s).*Failed to resolve flows in the pipeline.*".r - assert(runFailureErrorMsg.matches(ex.getMessage)) - val expectedLogPatterns = Set( - "(?s).*Failed to resolve flow.*Failed to read dataset 'spark_catalog.default.a'.*".r, - "(?s).*Failed to resolve flow.*[TABLE_OR_VIEW_NOT_FOUND].*".r) - expectedLogPatterns.foreach { logPattern => + val expectedEventMessages = Set("Run is COMPLETED") + expectedEventMessages.foreach { eventMessage => assert( - capturedEvents.exists(e => logPattern.matches(e.getMessage)), - s"Did not receive expected event matching pattern: $logPattern") + capturedEvents.exists(e => e.getMessage.contains(eventMessage)), + s"Did not receive expected event: $eventMessage" + ) } - // Ensure that the error causing the run failure is not surfaced to the user twice - assert(capturedEvents.forall(e => !runFailureErrorMsg.matches(e.getMessage))) + } + + // No flows should be started in dry run mode + capturedEvents.foreach { event => + assert(!event.getMessage.contains("is QUEUED")) } } } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala index a2c54a908af1..5bb6e25eaf45 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.pipelines.common.RunState import org.apache.spark.sql.pipelines.logging.{ ConstructPipelineEvent, EventLevel, + PipelineEvent, PipelineEventOrigin, RunProgress } @@ -38,31 +39,19 @@ class PipelineExecution(context: PipelineUpdateContext) { def executionStarted: Boolean = synchronized { graphExecution.nonEmpty } - /** * Starts the pipeline execution by initializing the graph and starting the graph execution * thread. This function does not block on the completion of the graph execution thread. */ def startPipeline(): Unit = synchronized { // Initialize the graph. - val initializedGraph = initializeGraph() + val resolvedGraph = resolveGraph() + val initializedGraph = DatasetManager.materializeDatasets(resolvedGraph, context) // Execute the graph. graphExecution = Option( new TriggeredGraphExecution(initializedGraph, context, onCompletion = terminationReason => { - context.eventCallback( - ConstructPipelineEvent( - origin = PipelineEventOrigin( - flowName = None, - datasetName = None, - sourceCodeLocation = None - ), - level = EventLevel.INFO, - message = terminationReason.message, - details = RunProgress(terminationReason.terminalState), - exception = terminationReason.cause - ) - ) + context.eventCallback(constructTerminationEvent(terminationReason)) }) ) graphExecution.foreach(_.start()) @@ -91,15 +80,38 @@ class PipelineExecution(context: PipelineUpdateContext) { } } - private def initializeGraph(): DataflowGraph = { - val resolvedGraph = try { + /** Validates that the pipeline graph can be successfully resolved and validates it. */ + def dryRunPipeline(): Unit = synchronized { + resolveGraph() + context.eventCallback( + constructTerminationEvent(RunCompletion()) + ) + } + + private def constructTerminationEvent( + terminationReason: RunTerminationReason + ): PipelineEvent = { + ConstructPipelineEvent( + origin = PipelineEventOrigin( + flowName = None, + datasetName = None, + sourceCodeLocation = None + ), + level = EventLevel.INFO, + message = terminationReason.message, + details = RunProgress(terminationReason.terminalState), + exception = terminationReason.cause + ) + } + + private def resolveGraph(): DataflowGraph = { + try { context.unresolvedGraph.resolve().validate() } catch { case e: UnresolvedPipelineException => handleInvalidPipeline(e) throw e } - DatasetManager.materializeDatasets(resolvedGraph, context) } /** Waits for the execution to complete. Only used in tests */ From 143fdd4f9789fb2ca07978257b0bdd21f38ff8a6 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 16 Jul 2025 20:05:27 -0700 Subject: [PATCH 2/6] add Python end to end test --- .../pipelines/tests/test_spark_connect.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 python/pyspark/pipelines/tests/test_spark_connect.py diff --git a/python/pyspark/pipelines/tests/test_spark_connect.py b/python/pyspark/pipelines/tests/test_spark_connect.py new file mode 100644 index 000000000000..0d24658cd760 --- /dev/null +++ b/python/pyspark/pipelines/tests/test_spark_connect.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests that run Pipelines against a Spark Connect server. +""" + +import unittest + +from pyspark.errors.exceptions.connect import AnalysisException +from pyspark.pipelines.graph_element_registry import graph_element_registration_context +from pyspark.pipelines.spark_connect_graph_element_registry import ( + SparkConnectGraphElementRegistry, +) +from pyspark.pipelines.spark_connect_pipeline import ( + create_dataflow_graph, + start_run, + handle_pipeline_events, +) +from pyspark import pipelines as sdp +from pyspark.testing.connectutils import ( + ReusedConnectTestCase, + should_test_connect, + connect_requirement_message, +) + + +@unittest.skipIf(not should_test_connect, connect_requirement_message) +class SparkConnectPipelinesTest(ReusedConnectTestCase): + def test_dry_run(self): + dataflow_graph_id = create_dataflow_graph(self.spark, None, None, None) + registry = SparkConnectGraphElementRegistry(self.spark, dataflow_graph_id) + + with graph_element_registration_context(registry): + + @sdp.materialized_view + def mv(): + return self.spark.range(1) + + result_iter = start_run(self.spark, dataflow_graph_id, dry=True) + handle_pipeline_events(result_iter) + + def test_dry_run_failure(self): + dataflow_graph_id = create_dataflow_graph(self.spark, None, None, None) + registry = SparkConnectGraphElementRegistry(self.spark, dataflow_graph_id) + + with graph_element_registration_context(registry): + + @sdp.table + def st(): + # Invalid because a streaming query is expected + return self.spark.range(1) + + result_iter = start_run(self.spark, dataflow_graph_id, dry=True) + with self.assertRaises(AnalysisException) as context: + handle_pipeline_events(result_iter) + self.assertIn( + "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_STREAMING_TABLE", str(context.exception) + ) + + +if __name__ == "__main__": + try: + import xmlrunner # type: ignore + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) From 9e0dd6fd476c7f0eeb061603b7e5b5552b5e14aa Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 17 Jul 2025 10:45:28 -0700 Subject: [PATCH 3/6] fix lint --- .../pipelines/PipelineEventStreamSuite.scala | 42 +++++++------------ 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala index 4013908a6413..83862545a723 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventStreamSuite.scala @@ -31,21 +31,18 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { createTable( name = "a", datasetType = proto.DatasetType.MATERIALIZED_VIEW, - sql = Some("SELECT * FROM RANGE(5)") - ) + sql = Some("SELECT * FROM RANGE(5)")) createTable( name = "b", datasetType = proto.DatasetType.TABLE, - sql = Some("SELECT * FROM STREAM a") - ) + sql = Some("SELECT * FROM STREAM a")) } registerPipelineDatasets(pipeline) val capturedEvents = new ArrayBuffer[PipelineEvent]() withClient { client => val startRunRequest = buildStartRunPlan( - proto.PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId).build() - ) + proto.PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId).build()) val responseIterator = client.execute(startRunRequest) while (responseIterator.hasNext) { val response = responseIterator.next() @@ -63,13 +60,11 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { "Flow spark_catalog.default.b is STARTING", "Flow spark_catalog.default.b is RUNNING", "Flow spark_catalog.default.b has COMPLETED", - "Run is COMPLETED" - ) + "Run is COMPLETED") expectedEventMessages.foreach { eventMessage => assert( capturedEvents.exists(e => e.getMessage.contains(eventMessage)), - s"Did not receive expected event: $eventMessage" - ) + s"Did not receive expected event: $eventMessage") } } } @@ -85,13 +80,11 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { createTable( name = "a", datasetType = proto.DatasetType.MATERIALIZED_VIEW, - sql = Some("SELECT * FROM unknown_table") - ) + sql = Some("SELECT * FROM unknown_table")) createTable( name = "b", datasetType = proto.DatasetType.TABLE, - sql = Some("SELECT * FROM STREAM a") - ) + sql = Some("SELECT * FROM STREAM a")) } registerPipelineDatasets(pipeline) @@ -102,8 +95,7 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { .newBuilder() .setDataflowGraphId(graphId) .setDry(dry) - .build() - ) + .build()) val ex = intercept[AnalysisException] { val responseIterator = client.execute(startRunRequest) while (responseIterator.hasNext) { @@ -118,13 +110,11 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { assert(runFailureErrorMsg.matches(ex.getMessage)) val expectedLogPatterns = Set( "(?s).*Failed to resolve flow.*Failed to read dataset 'spark_catalog.default.a'.*".r, - "(?s).*Failed to resolve flow.*[TABLE_OR_VIEW_NOT_FOUND].*".r - ) + "(?s).*Failed to resolve flow.*[TABLE_OR_VIEW_NOT_FOUND].*".r) expectedLogPatterns.foreach { logPattern => assert( capturedEvents.exists(e => logPattern.matches(e.getMessage)), - s"Did not receive expected event matching pattern: $logPattern" - ) + s"Did not receive expected event matching pattern: $logPattern") } // Ensure that the error causing the run failure is not surfaced to the user twice assert(capturedEvents.forall(e => !runFailureErrorMsg.matches(e.getMessage))) @@ -140,13 +130,11 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { createTable( name = "a", datasetType = proto.DatasetType.MATERIALIZED_VIEW, - sql = Some("SELECT * FROM RANGE(5)") - ) + sql = Some("SELECT * FROM RANGE(5)")) createTable( name = "b", datasetType = proto.DatasetType.TABLE, - sql = Some("SELECT * FROM STREAM a") - ) + sql = Some("SELECT * FROM STREAM a")) } registerPipelineDatasets(pipeline) @@ -157,8 +145,7 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { .newBuilder() .setDataflowGraphId(graphId) .setDry(true) - .build() - ) + .build()) val responseIterator = client.execute(startRunRequest) while (responseIterator.hasNext) { val response = responseIterator.next() @@ -170,8 +157,7 @@ class PipelineEventStreamSuite extends SparkDeclarativePipelinesServerTest { expectedEventMessages.foreach { eventMessage => assert( capturedEvents.exists(e => e.getMessage.contains(eventMessage)), - s"Did not receive expected event: $eventMessage" - ) + s"Did not receive expected event: $eventMessage") } } From 5a000431072d2de39cfd8bcf5f91647ca197ed3e Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 17 Jul 2025 10:47:32 -0700 Subject: [PATCH 4/6] cli fix --- python/pyspark/pipelines/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py index d22c78a7570f..f9d08a486ece 100644 --- a/python/pyspark/pipelines/cli.py +++ b/python/pyspark/pipelines/cli.py @@ -258,11 +258,11 @@ def run(spec_path: Path, dry: bool) -> None: run_parser.add_argument("--spec", help="Path to the pipeline spec.") # "dry-run" subcommand - run_parser = subparsers.add_parser( + dry_run_parser = subparsers.add_parser( "dry-run", help="Launch a run that just validates the graph and checks for errors.", ) - run_parser.add_argument("--spec", help="Path to the pipeline spec.") + dry_run_parser.add_argument("--spec", help="Path to the pipeline spec.") # "init" subcommand init_parser = subparsers.add_parser( From 5956c546449f020c363593701c1ea9d96cfef076 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 17 Jul 2025 16:12:29 -0700 Subject: [PATCH 5/6] fix refresh tests after merge conflict --- python/pyspark/pipelines/tests/test_cli.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/pipelines/tests/test_cli.py b/python/pyspark/pipelines/tests/test_cli.py index ded00e691db4..ff62ce42c4a3 100644 --- a/python/pyspark/pipelines/tests/test_cli.py +++ b/python/pyspark/pipelines/tests/test_cli.py @@ -373,6 +373,7 @@ def test_full_refresh_all_conflicts_with_full_refresh(self): full_refresh=["table1", "table2"], full_refresh_all=True, refresh=[], + dry=False, ) self.assertEqual( @@ -396,6 +397,7 @@ def test_full_refresh_all_conflicts_with_refresh(self): full_refresh=[], full_refresh_all=True, refresh=["table1", "table2"], + dry=False, ) self.assertEqual( @@ -421,6 +423,7 @@ def test_full_refresh_all_conflicts_with_both(self): full_refresh=["table1"], full_refresh_all=True, refresh=["table2"], + dry=False, ) self.assertEqual( From 63282a07519d8b5a59404ab555b98587f16f88d4 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 17 Jul 2025 16:13:51 -0700 Subject: [PATCH 6/6] fix test_spark_connect after merge conflict --- .../pipelines/tests/test_spark_connect.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pipelines/tests/test_spark_connect.py b/python/pyspark/pipelines/tests/test_spark_connect.py index 0d24658cd760..935295ec4a8c 100644 --- a/python/pyspark/pipelines/tests/test_spark_connect.py +++ b/python/pyspark/pipelines/tests/test_spark_connect.py @@ -51,7 +51,14 @@ def test_dry_run(self): def mv(): return self.spark.range(1) - result_iter = start_run(self.spark, dataflow_graph_id, dry=True) + result_iter = start_run( + self.spark, + dataflow_graph_id, + full_refresh=None, + refresh=None, + full_refresh_all=False, + dry=True, + ) handle_pipeline_events(result_iter) def test_dry_run_failure(self): @@ -65,7 +72,14 @@ def st(): # Invalid because a streaming query is expected return self.spark.range(1) - result_iter = start_run(self.spark, dataflow_graph_id, dry=True) + result_iter = start_run( + self.spark, + dataflow_graph_id, + full_refresh=None, + refresh=None, + full_refresh_all=False, + dry=True, + ) with self.assertRaises(AnalysisException) as context: handle_pipeline_events(result_iter) self.assertIn(