diff --git a/sdks/python/apache_beam/portability/python_urns.py b/sdks/python/apache_beam/portability/python_urns.py index a025709f38a5..ed9ec6a07258 100644 --- a/sdks/python/apache_beam/portability/python_urns.py +++ b/sdks/python/apache_beam/portability/python_urns.py @@ -49,6 +49,10 @@ # the state cache, as a decimal string, e.g. '2,1000'. EMBEDDED_PYTHON_GRPC = "beam:env:embedded_python_grpc:v1" +# Invoke UserFns via a yet-to-be-started loopback external worker. +# Payload: None. +EMBEDDED_PYTHON_LOOPBACK = "beam:env:embedded_python_loopback:v1" + # Instantiate SDK harness via a command line provided in the payload. # This is different than the standard process environment in that it # starts up the SDK harness directly, rather than the bootstrapping diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index 39697cbca20e..9670ac1ad7be 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -26,7 +26,7 @@ from apache_beam.portability.api import beam_expansion_api_pb2 from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.runners import pipeline_context -from apache_beam.runners.portability import portable_runner +from apache_beam.transforms import environments from apache_beam.transforms import external from apache_beam.transforms import ptransform @@ -37,7 +37,7 @@ def __init__(self, options=None): self._options = options or beam_pipeline.PipelineOptions( environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container') self._default_environment = ( - portable_runner.PortableRunner._create_environment(self._options)) + environments.Environment.from_options(self._options)) def Expand(self, request, context=None): try: diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index efa17cd01a93..c9bf15b46e22 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -35,14 +35,20 @@ class FlinkRunner(portable_runner.PortableRunner): - def run_pipeline(self, pipeline, options): + """A runner for launching jobs on Flink, automatically starting a local + flink master if one is not given. + """ + + # Inherits run_portable_pipeline from PortableRunner. + + def default_environment(self, options): portable_options = options.view_as(pipeline_options.PortableOptions) flink_options = options.view_as(pipeline_options.FlinkRunnerOptions) if (flink_options.flink_master in MAGIC_HOST_NAMES and not portable_options.environment_type and not portable_options.output_executable_path): portable_options.environment_type = 'LOOPBACK' - return super().run_pipeline(pipeline, options) + return super().default_environment(options) def default_job_server(self, options): flink_options = options.view_as(pipeline_options.FlinkRunnerOptions) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index ed09bb8f2236..5a4ef9374a56 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1354,8 +1354,7 @@ def expand(self, pcoll): with self.create_pipeline() as p: _ = p | beam.Create([10, 20, 30]) | PackableCombines() - res = p.run() - res.wait_until_finish() + res = p.result packed_step_name_regex = ( r'.*Packed.*PackableMin.*CombinePerKey.*PackableMax.*CombinePerKey.*' + diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 2c5d4ab36093..a742a510e8de 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -19,6 +19,7 @@ # mypy: check-untyped-defs import atexit +import copy import functools import itertools import logging @@ -36,16 +37,16 @@ from apache_beam.metrics import metric from apache_beam.metrics.execution import MetricResult from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import PortableOptions -from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions -from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.value_provider import ValueProvider from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 +from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import runner -from apache_beam.runners.common import group_by_key_input_visitor from apache_beam.runners.job import utils as job_utils from apache_beam.runners.portability import artifact_service from apache_beam.runners.portability import job_server @@ -57,9 +58,7 @@ if TYPE_CHECKING: from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports - from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline - from apache_beam.portability.api import beam_runner_api_pb2 __all__ = ['PortableRunner'] @@ -79,8 +78,6 @@ beam_job_api_pb2.JobState.CANCELLED, ] -ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'} - _LOGGER = logging.getLogger(__name__) @@ -268,29 +265,8 @@ def __init__(self): @staticmethod def _create_environment(options): # type: (PipelineOptions) -> environments.Environment - portable_options = options.view_as(PortableOptions) - # Do not set a Runner. Otherwise this can cause problems in Java's - # PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner - # does not exist in the Java SDK. In portability, the entry point is clearly - # defined via the JobService. - portable_options.view_as(StandardOptions).runner = None - environment_type = portable_options.environment_type - if not environment_type: - environment_urn = common_urns.environments.DOCKER.urn - elif environment_type.startswith('beam:env:'): - environment_urn = environment_type - else: - # e.g. handle LOOPBACK -> EXTERNAL - environment_type = ENV_TYPE_ALIASES.get( - environment_type, environment_type) - try: - environment_urn = getattr( - common_urns.environments, environment_type).urn - except AttributeError: - raise ValueError('Unknown environment type: %s' % environment_type) - - env_class = environments.Environment.get_env_cls_from_urn(environment_urn) - return env_class.from_options(portable_options) + return environments.Environment.from_options( + options.view_as(PortableOptions)) def default_job_server(self, options): raise NotImplementedError( @@ -322,12 +298,16 @@ def create_job_service(self, options): @staticmethod def get_proto_pipeline(pipeline, options): # type: (Pipeline, PipelineOptions) -> beam_runner_api_pb2.Pipeline - portable_options = options.view_as(PortableOptions) - proto_pipeline = pipeline.to_runner_api( - default_environment=PortableRunner._create_environment( - portable_options)) + default_environment=environments.Environment.from_options( + options.view_as(PortableOptions))) + + return PortableRunner._optimize_pipeline(proto_pipeline, options) + @staticmethod + def _optimize_pipeline( + proto_pipeline: beam_runner_api_pb2.Pipeline, + options: PipelineOptions) -> beam_runner_api_pb2.Pipeline: # TODO: https://github.com/apache/beam/issues/19493 # Eventually remove the 'pre_optimize' option alltogether and only perform # the equivalent of the 'default' case below (minus the 'lift_combiners' @@ -410,42 +390,24 @@ def get_proto_pipeline(pipeline, options): return proto_pipeline - def run_pipeline(self, pipeline, options): - # type: (Pipeline, PipelineOptions) -> PipelineResult + def run_portable_pipeline( + self, pipeline: beam_runner_api_pb2.Pipeline, + options: PipelineOptions) -> runner.PipelineResult: portable_options = options.view_as(PortableOptions) - # TODO: https://github.com/apache/beam/issues/19168 - # portable runner specific default - if options.view_as(SetupOptions).sdk_location == 'default': - options.view_as(SetupOptions).sdk_location = 'container' - - experiments = options.view_as(DebugOptions).experiments or [] - - # This is needed as we start a worker server if one is requested - # but none is provided. - if portable_options.environment_type == 'LOOPBACK': - use_loopback_process_worker = options.view_as( - DebugOptions).lookup_experiment('use_loopback_process_worker', False) - portable_options.environment_config, server = ( - worker_pool_main.BeamFnExternalWorkerPoolServicer.start( - state_cache_size= - sdk_worker_main._get_state_cache_size(experiments), - data_buffer_time_limit_ms= - sdk_worker_main._get_data_buffer_time_limit_ms(experiments), - use_process=use_loopback_process_worker)) - cleanup_callbacks = [functools.partial(server.stop, 1)] - else: - cleanup_callbacks = [] + # Do not set a Runner. Otherwise this can cause problems in Java's + # PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner + # does not exist in the Java SDK. In portability, the entry point is clearly + # defined via the JobService. + portable_options.view_as(StandardOptions).runner = None - pipeline.visit( - group_by_key_input_visitor( - not options.view_as(TypeOptions).allow_non_deterministic_key_coders) - ) + cleanup_callbacks = self.start_and_replace_loopback_environments( + pipeline, options) - proto_pipeline = self.get_proto_pipeline(pipeline, options) + optimized_pipeline = self._optimize_pipeline(pipeline, options) job_service_handle = self.create_job_service(options) - job_id, message_stream, state_stream = \ - job_service_handle.submit(proto_pipeline) + job_id, message_stream, state_stream = job_service_handle.submit( + optimized_pipeline) result = PipelineResult( job_service_handle.job_service, @@ -465,6 +427,32 @@ def run_pipeline(self, pipeline, options): portable_options.environment_type) return result + @staticmethod + def start_and_replace_loopback_environments(pipeline, options): + portable_options = copy.deepcopy(options.view_as(PortableOptions)) + experiments = options.view_as(DebugOptions).experiments or [] + cleanup_callbacks = [] + for env in pipeline.components.environments.values(): + if env.urn == python_urns.EMBEDDED_PYTHON_LOOPBACK: + # Start a worker and change the environment to point to that worker. + use_loopback_process_worker = options.view_as( + DebugOptions).lookup_experiment( + 'use_loopback_process_worker', False) + portable_options.environment_type = 'EXTERNAL' + portable_options.environment_config, server = ( + worker_pool_main.BeamFnExternalWorkerPoolServicer.start( + state_cache_size= + sdk_worker_main._get_state_cache_size(experiments), + data_buffer_time_limit_ms= + sdk_worker_main._get_data_buffer_time_limit_ms(experiments), + use_process=use_loopback_process_worker)) + external_env = environments.ExternalEnvironment.from_options( + portable_options).to_runner_api(None) # type: ignore + env.urn = external_env.urn + env.payload = external_env.payload + cleanup_callbacks.append(functools.partial(server.stop, 1)) + return cleanup_callbacks + class PortableMetrics(metric.MetricResults): def __init__(self, job_metrics_response): diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py b/sdks/python/apache_beam/runners/portability/spark_runner.py index c06d1b9c1a2c..480fbdecdce3 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner.py @@ -37,14 +37,20 @@ class SparkRunner(portable_runner.PortableRunner): - def run_pipeline(self, pipeline, options): + """A runner for launching jobs on Spark, automatically starting a local + spark master if one is not given. + """ + + # Inherits run_portable_pipeline from PortableRunner. + + def default_environment(self, options): spark_options = options.view_as(pipeline_options.SparkRunnerOptions) portable_options = options.view_as(pipeline_options.PortableOptions) if (re.match(LOCAL_MASTER_PATTERN, spark_options.spark_master_url) and not portable_options.environment_type and not portable_options.output_executable_path): portable_options.environment_type = 'LOOPBACK' - return super().run_pipeline(pipeline, options) + return super().default_environment(options) def default_job_server(self, options): spark_options = options.view_as(pipeline_options.SparkRunnerOptions) diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index f2e09edb28ee..2e4a5cdaac6e 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -21,22 +21,22 @@ import importlib import logging -import os -import shelve -import shutil -import tempfile from typing import TYPE_CHECKING from typing import Optional +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.runners.common import group_by_key_input_visitor +from apache_beam.transforms import environments if TYPE_CHECKING: from apache_beam import pvalue from apache_beam import PTransform - from apache_beam.options.pipeline_options import PipelineOptions - from apache_beam.pipeline import AppliedPTransform from apache_beam.pipeline import Pipeline - from apache_beam.pipeline import PipelineVisitor __all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult'] @@ -150,6 +150,24 @@ def run_async(self, transform(PBegin(p)) return p.run() + def run_portable_pipeline( + self, pipeline: beam_runner_api_pb2.Pipeline, + options: PipelineOptions) -> 'PipelineResult': + """Execute the entire pipeline. + + Runners should override this method. + """ + raise NotImplementedError + + def default_environment( + self, options: PipelineOptions) -> environments.Environment: + """Returns the default environment that should be used for this runner. + + Runners may override this method to provide alternative environments. + """ + return environments.Environment.from_options( + options.view_as(PortableOptions)) + def run_pipeline( self, pipeline, # type: Pipeline @@ -158,181 +176,39 @@ def run_pipeline( # type: (...) -> PipelineResult """Execute the entire pipeline or the sub-DAG reachable from a node. - - Runners should override this method. """ - raise NotImplementedError + pipeline.visit( + group_by_key_input_visitor( + not options.view_as(TypeOptions).allow_non_deterministic_key_coders) + ) + + # TODO: https://github.com/apache/beam/issues/19168 + # portable runner specific default + if options.view_as(SetupOptions).sdk_location == 'default': + options.view_as(SetupOptions).sdk_location = 'container' + + return self.run_portable_pipeline( + pipeline.to_runner_api( + default_environment=self.default_environment(options)), + options) def apply(self, transform, # type: PTransform input, # type: Optional[pvalue.PValue] options # type: PipelineOptions ): - """Runner callback for a pipeline.apply call. - - Args: - transform: the transform to apply. - input: transform's input (typically a PCollection). - - A concrete implementation of the Runner class may want to do custom - pipeline construction for a given transform. To override the behavior - for a transform class Xyz, implement an apply_Xyz method with this same - signature. - """ - for cls in transform.__class__.mro(): - m = getattr(self, 'apply_%s' % cls.__name__, None) - if m: - return m(transform, input, options) - raise NotImplementedError( - 'Execution of [%s] not implemented in runner %s.' % (transform, self)) - - def visit_transforms( - self, - pipeline, # type: Pipeline - options # type: PipelineOptions - ): - # type: (...) -> None - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.pipeline import PipelineVisitor - - class RunVisitor(PipelineVisitor): - def __init__(self, runner): - # type: (PipelineRunner) -> None - self.runner = runner - - def visit_transform(self, transform_node): - try: - self.runner.run_transform(transform_node, options) - except: - _LOGGER.error('Error while visiting %s', transform_node.full_label) - raise - - pipeline.visit(RunVisitor(self)) + # TODO(robertwb): Remove indirection once internal references are fixed. + return self.apply_PTransform(transform, input, options) def apply_PTransform(self, transform, input, options): - # The base case of apply is to call the transform's expand. + # TODO(robertwb): Remove indirection once internal references are fixed. return transform.expand(input) - def run_transform(self, - transform_node, # type: AppliedPTransform - options # type: PipelineOptions - ): - """Runner callback for a pipeline.run call. - - Args: - transform_node: transform node for the transform to run. - - A concrete implementation of the Runner class must implement run_Abc for - some class Abc in the method resolution order for every non-composite - transform Xyz in the pipeline. - """ - for cls in transform_node.transform.__class__.mro(): - m = getattr(self, 'run_%s' % cls.__name__, None) - if m: - return m(transform_node, options) - raise NotImplementedError( - 'Execution of [%s] not implemented in runner %s.' % - (transform_node.transform, self)) - def is_fnapi_compatible(self): """Whether to enable the beam_fn_api experiment by default.""" return True -class PValueCache(object): - """For internal use only; no backwards-compatibility guarantees. - - Local cache for arbitrary information computed for PValue objects.""" - def __init__(self, use_disk_backed_cache=False): - # Cache of values computed while a runner executes a pipeline. This is a - # dictionary of PValues and their computed values. Note that in principle - # the runner could contain PValues from several pipelines without clashes - # since a PValue is associated with one and only one pipeline. The keys of - # the dictionary are tuple of PValue instance addresses obtained using id() - # and tag names converted to strings. - - self._use_disk_backed_cache = use_disk_backed_cache - if use_disk_backed_cache: - self._tempdir = tempfile.mkdtemp() - self._cache = shelve.open(os.path.join(self._tempdir, 'shelve')) - else: - self._cache = {} - - def __del__(self): - if self._use_disk_backed_cache: - self._cache.close() - shutil.rmtree(self._tempdir) - - def __len__(self): - return len(self._cache) - - def to_cache_key(self, transform, tag): - return transform.full_label, tag - - def _ensure_pvalue_has_real_producer(self, pvalue): - """Ensure the passed-in PValue has the real_producer attribute. - - Args: - pvalue: A PValue instance whose cached value is requested. - - During the runner's execution only the results of the primitive transforms - are cached. Whenever we are looking for a PValue that is the output of a - composite transform we need to find the output of its rightmost transform - part. - """ - if not hasattr(pvalue, 'real_producer'): - real_producer = pvalue.producer - while real_producer.parts: - real_producer = real_producer.parts[-1] - pvalue.real_producer = real_producer - - def is_cached(self, pobj): - from apache_beam.pipeline import AppliedPTransform - if isinstance(pobj, AppliedPTransform): - transform = pobj - tag = None - else: - self._ensure_pvalue_has_real_producer(pobj) - transform = pobj.real_producer - tag = pobj.tag - return self.to_cache_key(transform, tag) in self._cache - - def cache_output(self, transform, tag_or_value, value=None): - if value is None: - value = tag_or_value - tag = None - else: - tag = tag_or_value - self._cache[self.to_cache_key(transform, tag)] = value - - def get_pvalue(self, pvalue): - """Gets the value associated with a PValue from the cache.""" - self._ensure_pvalue_has_real_producer(pvalue) - try: - return self._cache[self.key(pvalue)] - except KeyError: - if (pvalue.tag is not None and - self.to_cache_key(pvalue.real_producer, None) in self._cache): - # This is an undeclared, empty output of a DoFn executed - # in the local runner before this output was referenced. - return [] - else: - raise - - def get_unwindowed_pvalue(self, pvalue): - return [v.value for v in self.get_pvalue(pvalue)] - - def clear_pvalue(self, pvalue): - """Removes a PValue from the cache.""" - if self.is_cached(pvalue): - del self._cache[self.key(pvalue)] - - def key(self, pobj): - self._ensure_pvalue_has_real_producer(pobj) - return self.to_cache_key(pobj.real_producer, pobj.tag) - - # FIXME: replace with PipelineState(str, enum.Enum) class PipelineState(object): """State of the Pipeline, as returned by :attr:`PipelineResult.state`. diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index f6cbc7047738..109fcb825347 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -45,19 +45,18 @@ from google.protobuf import message from apache_beam import coders +from apache_beam.options.pipeline_options import PortableOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.portability import stager -from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder from apache_beam.transforms.resources import resource_hints_from_options from apache_beam.utils import proto_utils if TYPE_CHECKING: from apache_beam.options.pipeline_options import PipelineOptions - from apache_beam.options.pipeline_options import PortableOptions from apache_beam.runners.pipeline_context import PipelineContext __all__ = [ @@ -257,7 +256,26 @@ def from_options(cls, options): Args: options: The PortableOptions object. """ - raise NotImplementedError + if cls != Environment: + raise NotImplementedError + + portable_options = options.view_as(PortableOptions) + environment_type = portable_options.environment_type + if not environment_type: + environment_urn = common_urns.environments.DOCKER.urn + elif environment_type.startswith('beam:env:'): + environment_urn = environment_type + elif environment_type == 'LOOPBACK': + environment_urn = python_urns.EMBEDDED_PYTHON_LOOPBACK + else: + try: + environment_urn = getattr( + common_urns.environments, environment_type).urn + except AttributeError: + raise ValueError('Unknown environment type: %s' % environment_type) + + env_class = Environment.get_env_cls_from_urn(environment_urn) + return env_class.from_options(portable_options) # type: ignore @Environment.register_urn(common_urns.environments.DEFAULT.urn, None) @@ -337,6 +355,9 @@ def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.DockerPayloa def from_options(cls, options): # type: (PortableOptions) -> DockerEnvironment if options.view_as(SetupOptions).prebuild_sdk_container_engine: + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder prebuilt_container_image = SdkContainerImageBuilder.build_container_image( options) return cls.from_container_image( @@ -699,6 +720,27 @@ def default(cls): return cls(capabilities=python_sdk_capabilities(), artifacts=()) +@Environment.register_urn(python_urns.EMBEDDED_PYTHON_LOOPBACK, None) +class PythonLoopbackEnvironment(EmbeddedPythonEnvironment): + """Used as a stub when the loopback worker has not yet been started.""" + def to_runner_api_parameter(self, context): + # type: (PipelineContext) -> Tuple[str, None] + return python_urns.EMBEDDED_PYTHON_LOOPBACK, None + + @staticmethod + def from_runner_api_parameter(unused_payload, # type: None + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): + # type: (...) -> PythonLoopbackEnvironment + return PythonLoopbackEnvironment( + capabilities=capabilities, + artifacts=artifacts, + resource_hints=resource_hints) + + @Environment.register_urn(python_urns.SUBPROCESS_SDK, bytes) class SubprocessSDKEnvironment(Environment): def __init__(