Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove more legacy Runner v1 cruft. #27512

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/portability/python_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
# the state cache, as a decimal string, e.g. '2,1000'.
EMBEDDED_PYTHON_GRPC = "beam:env:embedded_python_grpc:v1"

# Invoke UserFns via a yet-to-be-started loopback external worker.
# Payload: None.
EMBEDDED_PYTHON_LOOPBACK = "beam:env:embedded_python_loopback:v1"

# Instantiate SDK harness via a command line provided in the payload.
# This is different than the standard process environment in that it
# starts up the SDK harness directly, rather than the bootstrapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import portable_runner
from apache_beam.transforms import environments
from apache_beam.transforms import external
from apache_beam.transforms import ptransform

Expand All @@ -37,7 +37,7 @@ def __init__(self, options=None):
self._options = options or beam_pipeline.PipelineOptions(
environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container')
self._default_environment = (
portable_runner.PortableRunner._create_environment(self._options))
environments.Environment.from_options(self._options))

def Expand(self, request, context=None):
try:
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/runners/portability/flink_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@


class FlinkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
"""A runner for launching jobs on Flink, automatically starting a local
flink master if one is not given.
"""

# Inherits run_full_pipeline from PortableRunner.

def default_environment(self, options):
portable_options = options.view_as(pipeline_options.PortableOptions)
flink_options = options.view_as(pipeline_options.FlinkRunnerOptions)
if (flink_options.flink_master in MAGIC_HOST_NAMES and
not portable_options.environment_type and
not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super().run_pipeline(pipeline, options)
return super().default_environment(options)

def default_job_server(self, options):
flink_options = options.view_as(pipeline_options.FlinkRunnerOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,8 +1354,7 @@ def expand(self, pcoll):
with self.create_pipeline() as p:
_ = p | beam.Create([10, 20, 30]) | PackableCombines()

res = p.run()
res.wait_until_finish()
res = p.result

packed_step_name_regex = (
r'.*Packed.*PackableMin.*CombinePerKey.*PackableMax.*CombinePerKey.*' +
Expand Down
118 changes: 53 additions & 65 deletions sdks/python/apache_beam/runners/portability/portable_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# mypy: check-untyped-defs

import atexit
import copy
import functools
import itertools
import logging
Expand All @@ -36,16 +37,16 @@
from apache_beam.metrics import metric
from apache_beam.metrics.execution import MetricResult
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.value_provider import ValueProvider
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import runner
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import job_server
Expand All @@ -57,9 +58,7 @@

if TYPE_CHECKING:
from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_runner_api_pb2

__all__ = ['PortableRunner']

Expand All @@ -79,8 +78,6 @@
beam_job_api_pb2.JobState.CANCELLED,
]

ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'}

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -268,29 +265,8 @@ def __init__(self):
@staticmethod
def _create_environment(options):
# type: (PipelineOptions) -> environments.Environment
portable_options = options.view_as(PortableOptions)
# Do not set a Runner. Otherwise this can cause problems in Java's
# PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None
environment_type = portable_options.environment_type
if not environment_type:
environment_urn = common_urns.environments.DOCKER.urn
elif environment_type.startswith('beam:env:'):
environment_urn = environment_type
else:
# e.g. handle LOOPBACK -> EXTERNAL
environment_type = ENV_TYPE_ALIASES.get(
environment_type, environment_type)
try:
environment_urn = getattr(
common_urns.environments, environment_type).urn
except AttributeError:
raise ValueError('Unknown environment type: %s' % environment_type)

env_class = environments.Environment.get_env_cls_from_urn(environment_urn)
return env_class.from_options(portable_options)
return environments.Environment.from_options(
options.view_as(PortableOptions))

def default_job_server(self, options):
raise NotImplementedError(
Expand Down Expand Up @@ -322,12 +298,16 @@ def create_job_service(self, options):
@staticmethod
def get_proto_pipeline(pipeline, options):
# type: (Pipeline, PipelineOptions) -> beam_runner_api_pb2.Pipeline
portable_options = options.view_as(PortableOptions)

proto_pipeline = pipeline.to_runner_api(
default_environment=PortableRunner._create_environment(
portable_options))
default_environment=environments.Environment.from_options(
options.view_as(PortableOptions)))

return PortableRunner._optimize_pipeline(proto_pipeline, options)

@staticmethod
def _optimize_pipeline(
proto_pipeline: beam_runner_api_pb2.Pipeline,
options: PipelineOptions) -> beam_runner_api_pb2.Pipeline:
# TODO: https://github.com/apache/beam/issues/19493
# Eventually remove the 'pre_optimize' option alltogether and only perform
# the equivalent of the 'default' case below (minus the 'lift_combiners'
Expand Down Expand Up @@ -410,42 +390,24 @@ def get_proto_pipeline(pipeline, options):

return proto_pipeline

def run_pipeline(self, pipeline, options):
# type: (Pipeline, PipelineOptions) -> PipelineResult
def run_full_pipeline(
self, pipeline: beam_runner_api_pb2.Pipeline,
options: PipelineOptions) -> runner.PipelineResult:
portable_options = options.view_as(PortableOptions)

# TODO: https://github.com/apache/beam/issues/19168
# portable runner specific default
if options.view_as(SetupOptions).sdk_location == 'default':
options.view_as(SetupOptions).sdk_location = 'container'

experiments = options.view_as(DebugOptions).experiments or []

# This is needed as we start a worker server if one is requested
# but none is provided.
if portable_options.environment_type == 'LOOPBACK':
use_loopback_process_worker = options.view_as(
DebugOptions).lookup_experiment('use_loopback_process_worker', False)
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
cleanup_callbacks = [functools.partial(server.stop, 1)]
else:
cleanup_callbacks = []
# Do not set a Runner. Otherwise this can cause problems in Java's
# PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None

pipeline.visit(
group_by_key_input_visitor(
not options.view_as(TypeOptions).allow_non_deterministic_key_coders)
)
cleanup_callbacks = self.start_and_replace_loopback_environments(
pipeline, options)

proto_pipeline = self.get_proto_pipeline(pipeline, options)
optimized_pipeline = self._optimize_pipeline(pipeline, options)
job_service_handle = self.create_job_service(options)
job_id, message_stream, state_stream = \
job_service_handle.submit(proto_pipeline)
job_id, message_stream, state_stream = job_service_handle.submit(
optimized_pipeline)

result = PipelineResult(
job_service_handle.job_service,
Expand All @@ -465,6 +427,32 @@ def run_pipeline(self, pipeline, options):
portable_options.environment_type)
return result

@staticmethod
def start_and_replace_loopback_environments(pipeline, options):
portable_options = copy.deepcopy(options.view_as(PortableOptions))
experiments = options.view_as(DebugOptions).experiments or []
cleanup_callbacks = []
for env in pipeline.components.environments.values():
if env.urn == python_urns.EMBEDDED_PYTHON_LOOPBACK:
# Start a worker and change the environment to point to that worker.
use_loopback_process_worker = options.view_as(
DebugOptions).lookup_experiment(
'use_loopback_process_worker', False)
portable_options.environment_type = 'EXTERNAL'
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
external_env = environments.ExternalEnvironment.from_options(
portable_options).to_runner_api(None) # type: ignore
env.urn = external_env.urn
env.payload = external_env.payload
cleanup_callbacks.append(functools.partial(server.stop, 1))
return cleanup_callbacks


class PortableMetrics(metric.MetricResults):
def __init__(self, job_metrics_response):
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/runners/portability/spark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@


class SparkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
"""A runner for launching jobs on Spark, automatically starting a local
spark master if one is not given.
"""

# Inherits run_full_pipeline from PortableRunner.

def default_environment(self, options):
spark_options = options.view_as(pipeline_options.SparkRunnerOptions)
portable_options = options.view_as(pipeline_options.PortableOptions)
if (re.match(LOCAL_MASTER_PATTERN, spark_options.spark_master_url) and
not portable_options.environment_type and
not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super().run_pipeline(pipeline, options)
return super().default_environment(options)

def default_job_server(self, options):
spark_options = options.view_as(pipeline_options.SparkRunnerOptions)
Expand Down
Loading
Loading