Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions python/pyspark/pipelines/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def append_flow(
target: str,
name: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
once: bool = False,
) -> Callable[[QueryFunction], None]:
"""
Return a decorator on a query function to define a flow in a pipeline.
Expand All @@ -46,8 +45,6 @@ def append_flow(
:param spark_conf: A dict whose keys are the conf names and values are the conf values. \
These confs will be set when the flow is executed; they can override confs set for the \
destination, for the pipeline, or on the cluster.
:param once: If True, indicates this flow should run only once. (It will be rerun upon a full \
refresh operation.)
"""
if name is not None and type(name) is not str:
raise PySparkTypeError(
Expand All @@ -67,7 +64,7 @@ def outer(func: QueryFunction) -> None:
target=target,
spark_conf=spark_conf,
source_code_location=source_code_location,
once=once,
once=False,
func=func,
)
get_active_graph_element_registry().register_flow(flow)
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/pipelines/tests/test_graph_element_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def st():

sdp.create_streaming_table("st2")

@sdp.append_flow(target="st2", once=True)
@sdp.append_flow(target="st2")
def flow1():
raise NotImplementedError()

@sdp.append_flow(target="st2", once=False)
@sdp.append_flow(target="st2")
def flow2():
raise NotImplementedError()

Expand Down Expand Up @@ -74,7 +74,7 @@ def flow2():
st2_flow1_obj = registry.flows[2]
self.assertEqual(st2_flow1_obj.name, "flow1")
self.assertEqual(st2_flow1_obj.target, "st2")
self.assertEqual(st2_flow1_obj.once, True)
self.assertEqual(st2_flow1_obj.once, False)
assert mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")

st2_flow1_obj = registry.flows[3]
Expand Down