Skip to content

Custom s3 path for uploading spark configuration files #3200

@nebur395

Description

@nebur395

Describe the feature you'd like
In order to be able to organise our S3 bucket in which the input and the outputs are being uploaded for our Sagemaker pipelines, we want to be able to specify a custom spark configuration path. To improve our data lineage we are trying to store these files by pipeline name and execution ids.

Currently there is no possibility to do so and those spark config files are being uploaded automatically following this convention:
https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/spark/processing.py#L391-L394.

So Ideally we would like to be able to specify a custom s3 path or at least a prefix for the uploaded files in S3. It's important to be able to use ExecutionVariables in that s3 path. Something like this:

serialized_configuration = BytesIO(json.dumps(configuration["content"]).encode("utf-8"))
if configuration["s3_uri"] != None:
    s3_uri = (
        f"{configuration["s3_uri"]}/{self._conf_file_name}"
    )
else:
    s3_uri = (
        f"s3://{self.sagemaker_session.default_bucket()}/{self._current_job_name}/"
        f"input/{self._conf_container_input_name}/{self._conf_file_name}"
    )

How would this feature be used? Please describe.

import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join

boto_session = boto3.Session(region_name="eu-west-1")

sagemaker_client = boto_session.client("sagemaker")

default_bucket = "XYZ"

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    default_bucket=default_bucket
)

bucket_pipeline_execution_prefix = [
    "s3:/", default_bucket, ExecutionVariables.PIPELINE_NAME, ExecutionVariables.PIPELINE_EXECUTION_ID
]

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    role="XYZ",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    image_uri="XYZ",
    sagemaker_session=sagemaker_session
)

spark_config = {}

run_args = spark_processor.get_run_args(
    submit_app="./processing.py",
    configuration={ 
        content: spark_config,
        s3_uri: Join(on="/", values=[
            *bucket_pipeline_execution_prefix, "spark-config"
        ])
    },
    inputs=[],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            output_name='demo-output',
            destination=Join(on="/", values=[
                *bucket_pipeline_execution_prefix, "preprocessed"
            ])
        )
    ],
    spark_event_logs_s3_uri=Join(on="/", values=[
                *bucket_pipeline_execution_prefix, "spark-event-logs"
            ]) # work in progress: https://github.com/aws/sagemaker-python-sdk/pull/3167
)

step_process = ProcessingStep(
    name="demo-processing",
    processor=spark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    code=run_args.code,
)

pipeline = Pipeline(
    name='demo-pipeline',
    parameters=[],
    sagemaker_session=sagemaker_session,
    steps=[step_process],
)

pipeline.upsert(role_arn="XYZ")

execution = pipeline.start()

In the end the s3 bucket should look like something similar to this:

|- XYZ 
|--- demo-pipeline
|----- execution-1
|------- spark-config
|--------- configuration.json
|------- spark-event-logs
|--------- application_0001
|------- preprocessed
|--------- *.* # processing output files

Therefore now you have everything related with that demo-processing (spark app logs, spark app config files, spark outputs...) under s3://XYZ/demo-pipeline/execution-1/, increasing data lineage, and reproducibility

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions