Skip to content

Commit

Permalink
[Python]Enable state cache to 100 MB (#28781)
Browse files Browse the repository at this point in the history
* change state_cache_mb to 100 MB as default

* Address comments

* Reword help of pipeline option

* Reword help of pipeline option

* Fix doc string

* reword docstring

* Set default in the pipeline options

* Reword documentation

* Add warning

* Address comments

* Update CHANGES.md
  • Loading branch information
AnandInguva authored Oct 31, 2023
1 parent 16fee75 commit d329a7e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
jobs using the DataStream API. By default the option is set to false, so the batch jobs are still executed
using the DataSet API.
* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).
* state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).

## Breaking Changes

Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,22 @@ def _add_argparse_args(cls, parser):
dest='min_cpu_platform',
type=str,
help='GCE minimum CPU platform. Default is determined by GCP.')
parser.add_argument(
'--max_cache_memory_usage_mb',
dest='max_cache_memory_usage_mb',
type=int,
default=100,
help=(
'Size of the SDK Harness cache to store user state and side '
'inputs in MB. Default is 100MB. If the cache is full, least '
'recently used elements will be evicted. This cache is per '
'each SDK Harness instance. SDK Harness is a component '
'responsible for executing the user code and communicating with '
'the runner. Depending on the runner, there may be more than one '
'SDK Harness process running on the same worker node. Increasing '
'cache size might improve performance of some pipelines, but can '
'lead to an increase in memory consumption and OOM errors if '
'workers are not appropriately provisioned.'))

def validate(self, validator):
errors = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ def start_and_replace_loopback_environments(pipeline, options):
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
sdk_worker_main._get_state_cache_size_bytes(
options=options),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
Expand Down
26 changes: 16 additions & 10 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.internal import names
Expand Down Expand Up @@ -159,7 +160,8 @@ def create_harness(environment, dry_run=False):
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=_get_state_cache_size(experiments),
state_cache_size=_get_state_cache_size_bytes(
options=sdk_pipeline_options),
data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments),
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
Expand Down Expand Up @@ -239,24 +241,28 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


def _get_state_cache_size(experiments):
"""Defines the upper number of state items to cache.
Note: state_cache_size is an experimental flag and might not be available in
future releases.
def _get_state_cache_size_bytes(options):
"""Return the maximum size of state cache in bytes.
Returns:
an int indicating the maximum number of megabytes to cache.
Default is 100 MB
an int indicating the maximum number of bytes to cache.
"""

max_cache_memory_usage_mb = options.view_as(
WorkerOptions).max_cache_memory_usage_mb
# to maintain backward compatibility
experiments = options.view_as(DebugOptions).experiments or []
for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
_LOGGER.warning(
'--experiments=state_cache_size=X is deprecated and will be removed '
'in future releases.'
'Please use --max_cache_memory_usage_mb=X to set the cache size for '
'user state API and side inputs.')
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return 100 << 20
return max_cache_memory_usage_mb << 20


def _get_data_buffer_time_limit_ms(experiments):
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ def test_gcp_profiler_uses_job_name_when_enabled_as_experiment(self):
sdk_worker_main._start_profiler(gcp_profiler_name, "version")
sdk_worker_main._start_profiler.assert_called_with("sample_job", "version")

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb(self):
options = PipelineOptions(flags=['--max_cache_memory_usage_mb=50'])

cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, 50 << 20)

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb_with_experiments(self):
options = PipelineOptions(flags=['--experiments=state_cache_size=50'])
cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, 50 << 20)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit d329a7e

Please sign in to comment.