Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/declarative-pipelines-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ The `spark-pipelines init` command, described below, makes it easy to generate a

## The `spark-pipelines` Command Line Interface

The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project.
The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project and a `dry-run` subcommand for validating a pipeline.

`spark-pipelines` is built on top of `spark-submit`, meaning that it supports all cluster managers supported by `spark-submit`. It supports all `spark-submit` arguments except for `--class`.

Expand All @@ -106,6 +106,13 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute

`spark-pipelines run` launches an execution of a pipeline and monitors its progress until it completes. The `--spec` parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named `pipeline.yml` or `pipeline.yaml`.

### `spark-pipelines dry-run`

`spark-pipelines dry-run` launches an execution of a pipeline that doesn't write or read any data, but catches many kinds of errors that would be caught if the pipeline were to actually run. E.g.
- Syntax errors – e.g. invalid Python or SQL code
- Analysis errors – e.g. selecting from a table that doesn't exist or selecting a column that doesn't exist
- Graph validation errors - e.g. cyclic dependencies

## Programming with SDP in Python

SDP Python functions are defined in the `pyspark.pipelines` module. Your pipelines implemented with the Python API must import this module. It's common to alias the module to `sdp` to limit the number of characters you need to type when using its APIs.
Expand Down
17 changes: 12 additions & 5 deletions python/pyspark/pipelines/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def change_dir(path: Path) -> Generator[None, None, None]:
os.chdir(prev)


def run(spec_path: Path) -> None:
def run(spec_path: Path, dry: bool) -> None:
"""Run the pipeline defined with the given spec."""
log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...")
spec = load_pipeline_spec(spec_path)
Expand All @@ -242,7 +242,7 @@ def run(spec_path: Path) -> None:
register_definitions(spec_path, registry, spec)

log_with_curr_timestamp("Starting run...")
result_iter = start_run(spark, dataflow_graph_id)
result_iter = start_run(spark, dataflow_graph_id, dry=dry)
try:
handle_pipeline_events(result_iter)
finally:
Expand All @@ -257,6 +257,13 @@ def run(spec_path: Path) -> None:
run_parser = subparsers.add_parser("run", help="Run a pipeline.")
run_parser.add_argument("--spec", help="Path to the pipeline spec.")

# "dry-run" subcommand
run_parser = subparsers.add_parser(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sryza shall we have an end-to-end test for the dry run mode? We should check that it can detect failures without side effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing – just added, in test_spark_connect.py

"dry-run",
help="Launch a run that just validates the graph and checks for errors.",
)
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be added mistakenly. Please remove this duplication because we already have this at line 258, @sryza . 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching – just fixed


# "init" subcommand
init_parser = subparsers.add_parser(
"init",
Expand All @@ -270,9 +277,9 @@ def run(spec_path: Path) -> None:
)

args = parser.parse_args()
assert args.command in ["run", "init"]
assert args.command in ["run", "dry-run", "init"]

if args.command == "run":
if args.command in ["run", "dry-run"]:
if args.spec is not None:
spec_path = Path(args.spec)
if not spec_path.is_file():
Expand All @@ -283,6 +290,6 @@ def run(spec_path: Path) -> None:
else:
spec_path = find_pipeline_spec(Path.cwd())

run(spec_path=spec_path)
run(spec_path=spec_path, dry=(args.command == "dry-run"))
elif args.command == "init":
init(args.name)
4 changes: 2 additions & 2 deletions python/pyspark/pipelines/spark_connect_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ def handle_pipeline_events(iter: Iterator[Dict[str, Any]]) -> None:
log_with_provided_timestamp(event.message, dt)


def start_run(spark: SparkSession, dataflow_graph_id: str) -> Iterator[Dict[str, Any]]:
def start_run(spark: SparkSession, dataflow_graph_id: str, dry: bool) -> Iterator[Dict[str, Any]]:
"""Start a run of the dataflow graph in the Spark Connect server.

:param dataflow_graph_id: The ID of the dataflow graph to start.
"""
inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id)
inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id, dry=dry)
command = pb2.Command()
command.pipeline_command.start_run.CopyFrom(inner_command)
# Cast because mypy seems to think `spark`` is a function, not an object. Likely related to
Expand Down
83 changes: 83 additions & 0 deletions python/pyspark/pipelines/tests/test_spark_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Tests that run Pipelines against a Spark Connect server.
"""

import unittest

from pyspark.errors.exceptions.connect import AnalysisException
from pyspark.pipelines.graph_element_registry import graph_element_registration_context
from pyspark.pipelines.spark_connect_graph_element_registry import (
SparkConnectGraphElementRegistry,
)
from pyspark.pipelines.spark_connect_pipeline import (
create_dataflow_graph,
start_run,
handle_pipeline_events,
)
from pyspark import pipelines as sdp
from pyspark.testing.connectutils import (
ReusedConnectTestCase,
should_test_connect,
connect_requirement_message,
)


@unittest.skipIf(not should_test_connect, connect_requirement_message)
class SparkConnectPipelinesTest(ReusedConnectTestCase):
def test_dry_run(self):
dataflow_graph_id = create_dataflow_graph(self.spark, None, None, None)
registry = SparkConnectGraphElementRegistry(self.spark, dataflow_graph_id)

with graph_element_registration_context(registry):

@sdp.materialized_view
def mv():
return self.spark.range(1)

result_iter = start_run(self.spark, dataflow_graph_id, dry=True)
handle_pipeline_events(result_iter)

def test_dry_run_failure(self):
dataflow_graph_id = create_dataflow_graph(self.spark, None, None, None)
registry = SparkConnectGraphElementRegistry(self.spark, dataflow_graph_id)

with graph_element_registration_context(registry):

@sdp.table
def st():
# Invalid because a streaming query is expected
return self.spark.range(1)

result_iter = start_run(self.spark, dataflow_graph_id, dry=True)
with self.assertRaises(AnalysisException) as context:
handle_pipeline_events(result_iter)
self.assertIn(
"INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_STREAMING_TABLE", str(context.exception)
)


if __name__ == "__main__":
try:
import xmlrunner # type: ignore

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
Loading