Skip to content

Commit

Permalink
Make runner entrypoint more portable.
Browse files Browse the repository at this point in the history
Legacy runners can still override the Pipeline-object-taking run_pipeline
method, but this now has a default implementation.

As part of this it was necessary to refactor environments to make loopback
less of a special case.
  • Loading branch information
robertwb committed Jul 28, 2023
1 parent b333268 commit 27c7cb3
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 80 deletions.
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/portability/python_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
# the state cache, as a decimal string, e.g. '2,1000'.
EMBEDDED_PYTHON_GRPC = "beam:env:embedded_python_grpc:v1"

# Invoke UserFns via a yet-to-be-started loopback external worker.
# Payload: None.
EMBEDDED_PYTHON_LOOPBACK = "beam:env:embedded_python_loopback:v1"

# Instantiate SDK harness via a command line provided in the payload.
# This is different than the standard process environment in that it
# starts up the SDK harness directly, rather than the bootstrapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import portable_runner
from apache_beam.transforms import environments
from apache_beam.transforms import external
from apache_beam.transforms import ptransform

Expand All @@ -37,7 +37,7 @@ def __init__(self, options=None):
self._options = options or beam_pipeline.PipelineOptions(
environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container')
self._default_environment = (
portable_runner.PortableRunner._create_environment(self._options))
environments.Environment.from_options(self._options))

def Expand(self, request, context=None):
try:
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/runners/portability/flink_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@


class FlinkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
"""A runner for launching jobs on Flink, automatically starting a local
flink master if one is not given.
"""

# Inherits run_full_pipeline from PortableRunner.

def default_environment(self, options):
portable_options = options.view_as(pipeline_options.PortableOptions)
flink_options = options.view_as(pipeline_options.FlinkRunnerOptions)
if (flink_options.flink_master in MAGIC_HOST_NAMES and
not portable_options.environment_type and
not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super().run_pipeline(pipeline, options)
return super().default_environment(options)

def default_job_server(self, options):
flink_options = options.view_as(pipeline_options.FlinkRunnerOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,8 +1354,7 @@ def expand(self, pcoll):
with self.create_pipeline() as p:
_ = p | beam.Create([10, 20, 30]) | PackableCombines()

res = p.run()
res.wait_until_finish()
res = p.result

packed_step_name_regex = (
r'.*Packed.*PackableMin.*CombinePerKey.*PackableMax.*CombinePerKey.*' +
Expand Down
118 changes: 53 additions & 65 deletions sdks/python/apache_beam/runners/portability/portable_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# mypy: check-untyped-defs

import atexit
import copy
import functools
import itertools
import logging
Expand All @@ -36,16 +37,16 @@
from apache_beam.metrics import metric
from apache_beam.metrics.execution import MetricResult
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.value_provider import ValueProvider
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import runner
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import job_server
Expand All @@ -57,9 +58,7 @@

if TYPE_CHECKING:
from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_runner_api_pb2

__all__ = ['PortableRunner']

Expand All @@ -79,8 +78,6 @@
beam_job_api_pb2.JobState.CANCELLED,
]

ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'}

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -268,29 +265,8 @@ def __init__(self):
@staticmethod
def _create_environment(options):
# type: (PipelineOptions) -> environments.Environment
portable_options = options.view_as(PortableOptions)
# Do not set a Runner. Otherwise this can cause problems in Java's
# PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None
environment_type = portable_options.environment_type
if not environment_type:
environment_urn = common_urns.environments.DOCKER.urn
elif environment_type.startswith('beam:env:'):
environment_urn = environment_type
else:
# e.g. handle LOOPBACK -> EXTERNAL
environment_type = ENV_TYPE_ALIASES.get(
environment_type, environment_type)
try:
environment_urn = getattr(
common_urns.environments, environment_type).urn
except AttributeError:
raise ValueError('Unknown environment type: %s' % environment_type)

env_class = environments.Environment.get_env_cls_from_urn(environment_urn)
return env_class.from_options(portable_options)
return environments.Environment.from_options(
options.view_as(PortableOptions))

def default_job_server(self, options):
raise NotImplementedError(
Expand Down Expand Up @@ -322,12 +298,16 @@ def create_job_service(self, options):
@staticmethod
def get_proto_pipeline(pipeline, options):
# type: (Pipeline, PipelineOptions) -> beam_runner_api_pb2.Pipeline
portable_options = options.view_as(PortableOptions)

proto_pipeline = pipeline.to_runner_api(
default_environment=PortableRunner._create_environment(
portable_options))
default_environment=environments.Environment.from_options(
options.view_as(PortableOptions)))

return PortableRunner._optimize_pipeline(proto_pipeline, options)

@staticmethod
def _optimize_pipeline(
proto_pipeline: beam_runner_api_pb2.Pipeline,
options: PipelineOptions) -> beam_runner_api_pb2.Pipeline:
# TODO: https://github.com/apache/beam/issues/19493
# Eventually remove the 'pre_optimize' option alltogether and only perform
# the equivalent of the 'default' case below (minus the 'lift_combiners'
Expand Down Expand Up @@ -410,42 +390,24 @@ def get_proto_pipeline(pipeline, options):

return proto_pipeline

def run_pipeline(self, pipeline, options):
# type: (Pipeline, PipelineOptions) -> PipelineResult
def run_full_pipeline(
self, pipeline: beam_runner_api_pb2.Pipeline,
options: PipelineOptions) -> runner.PipelineResult:
portable_options = options.view_as(PortableOptions)

# TODO: https://github.com/apache/beam/issues/19168
# portable runner specific default
if options.view_as(SetupOptions).sdk_location == 'default':
options.view_as(SetupOptions).sdk_location = 'container'

experiments = options.view_as(DebugOptions).experiments or []

# This is needed as we start a worker server if one is requested
# but none is provided.
if portable_options.environment_type == 'LOOPBACK':
use_loopback_process_worker = options.view_as(
DebugOptions).lookup_experiment('use_loopback_process_worker', False)
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
cleanup_callbacks = [functools.partial(server.stop, 1)]
else:
cleanup_callbacks = []
# Do not set a Runner. Otherwise this can cause problems in Java's
# PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None

pipeline.visit(
group_by_key_input_visitor(
not options.view_as(TypeOptions).allow_non_deterministic_key_coders)
)
cleanup_callbacks = self.start_and_replace_loopback_environments(
pipeline, options)

proto_pipeline = self.get_proto_pipeline(pipeline, options)
optimized_pipeline = self._optimize_pipeline(pipeline, options)
job_service_handle = self.create_job_service(options)
job_id, message_stream, state_stream = \
job_service_handle.submit(proto_pipeline)
job_id, message_stream, state_stream = job_service_handle.submit(
optimized_pipeline)

result = PipelineResult(
job_service_handle.job_service,
Expand All @@ -465,6 +427,32 @@ def run_pipeline(self, pipeline, options):
portable_options.environment_type)
return result

@staticmethod
def start_and_replace_loopback_environments(pipeline, options):
portable_options = copy.deepcopy(options.view_as(PortableOptions))
experiments = options.view_as(DebugOptions).experiments or []
cleanup_callbacks = []
for env in pipeline.components.environments.values():
if env.urn == python_urns.EMBEDDED_PYTHON_LOOPBACK:
# Start a worker and change the environment to point to that worker.
use_loopback_process_worker = options.view_as(
DebugOptions).lookup_experiment(
'use_loopback_process_worker', False)
portable_options.environment_type = 'EXTERNAL'
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
external_env = environments.ExternalEnvironment.from_options(
portable_options).to_runner_api(None) # type: ignore
env.urn = external_env.urn
env.payload = external_env.payload
cleanup_callbacks.append(functools.partial(server.stop, 1))
return cleanup_callbacks


class PortableMetrics(metric.MetricResults):
def __init__(self, job_metrics_response):
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/runners/portability/spark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@


class SparkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
"""A runner for launching jobs on Spark, automatically starting a local
spark master if one is not given.
"""

# Inherits run_full_pipeline from PortableRunner.

def default_environment(self, options):
spark_options = options.view_as(pipeline_options.SparkRunnerOptions)
portable_options = options.view_as(pipeline_options.PortableOptions)
if (re.match(LOCAL_MASTER_PATTERN, spark_options.spark_master_url) and
not portable_options.environment_type and
not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super().run_pipeline(pipeline, options)
return super().default_environment(options)

def default_job_server(self, options):
spark_options = options.view_as(pipeline_options.SparkRunnerOptions)
Expand Down
43 changes: 39 additions & 4 deletions sdks/python/apache_beam/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
from typing import TYPE_CHECKING
from typing import Optional

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.transforms import environments

if TYPE_CHECKING:
from apache_beam import pvalue
from apache_beam import PTransform
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline

__all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
Expand Down Expand Up @@ -144,6 +150,24 @@ def run_async(self,
transform(PBegin(p))
return p.run()

def run_full_pipeline(
self, pipeline: beam_runner_api_pb2.Pipeline,
options: PipelineOptions) -> 'PipelineResult':
"""Execute the entire pipeline.
Runners should override this method.
"""
raise NotImplementedError

def default_environment(
self, options: PipelineOptions) -> environments.Environment:
"""Returns the default environment that should be used for this runner.
Runners may override this method to provide alternative environments.
"""
return environments.Environment.from_options(
options.view_as(PortableOptions))

def run_pipeline(
self,
pipeline, # type: Pipeline
Expand All @@ -152,10 +176,21 @@ def run_pipeline(
# type: (...) -> PipelineResult

"""Execute the entire pipeline or the sub-DAG reachable from a node.
Runners should override this method.
"""
raise NotImplementedError
pipeline.visit(
group_by_key_input_visitor(
not options.view_as(TypeOptions).allow_non_deterministic_key_coders)
)

# TODO: https://github.com/apache/beam/issues/19168
# portable runner specific default
if options.view_as(SetupOptions).sdk_location == 'default':
options.view_as(SetupOptions).sdk_location = 'container'

return self.run_full_pipeline(
pipeline.to_runner_api(
default_environment=self.default_environment(options)),
options)

def apply(self,
transform, # type: PTransform
Expand Down
Loading

0 comments on commit 27c7cb3

Please sign in to comment.