Skip to content

Commit

Permalink
feat: Enhance Spark operator blueprint to add S3 Tables support (#721)
Browse files Browse the repository at this point in the history
Signed-off-by: Manabu McCloskey <[email protected]>
Co-authored-by: Vara Bonthu <[email protected]>
Co-authored-by: Manabu McCloskey <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent e811724 commit e3f1a6b
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 5 deletions.
9 changes: 6 additions & 3 deletions analytics/terraform/spark-k8s-operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/
|------|------|
| [aws_eks_access_entry.karpenter_nodes](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/eks_access_entry) | resource |
| [aws_iam_policy.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.s3tables](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.s3tables_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.spark](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_prometheus_workspace.amp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/prometheus_workspace) | resource |
| [aws_s3_object.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
Expand All @@ -61,6 +63,7 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/
| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source |
| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source |
| [aws_iam_policy_document.grafana](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.s3tables_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.spark_operator](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_session_context.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_session_context) | data source |
| [aws_partition.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/partition) | data source |
Expand All @@ -71,13 +74,13 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_eks_cluster_version"></a> [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.30"` | no |
| <a name="input_eks_cluster_version"></a> [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.31"` | no |
| <a name="input_eks_data_plane_subnet_secondary_cidr"></a> [eks\_data\_plane\_subnet\_secondary\_cidr](#input\_eks\_data\_plane\_subnet\_secondary\_cidr) | Secondary CIDR blocks. 32766 IPs per Subnet per Subnet/AZ for EKS Node and Pods | `list(string)` | <pre>[<br> "100.64.0.0/17",<br> "100.64.128.0/17"<br>]</pre> | no |
| <a name="input_enable_amazon_prometheus"></a> [enable\_amazon\_prometheus](#input\_enable\_amazon\_prometheus) | Enable AWS Managed Prometheus service | `bool` | `true` | no |
| <a name="input_enable_vpc_endpoints"></a> [enable\_vpc\_endpoints](#input\_enable\_vpc\_endpoints) | Enable VPC Endpoints | `bool` | `false` | no |
| <a name="input_enable_yunikorn"></a> [enable\_yunikorn](#input\_enable\_yunikorn) | Enable Apache YuniKorn Scheduler | `bool` | `true` | no |
| <a name="input_enable_yunikorn"></a> [enable\_yunikorn](#input\_enable\_yunikorn) | Enable Apache YuniKorn Scheduler | `bool` | `false` | no |
| <a name="input_kms_key_admin_roles"></a> [kms\_key\_admin\_roles](#input\_kms\_key\_admin\_roles) | list of role ARNs to add to the KMS policy | `list(string)` | `[]` | no |
| <a name="input_name"></a> [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"spark-operator-doeks"` | no |
| <a name="input_name"></a> [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"spark-eks-s3tables"` | no |
| <a name="input_private_subnets"></a> [private\_subnets](#input\_private\_subnets) | Private Subnets CIDRs. 254 IPs per Subnet/AZ for Private NAT + NLB + Airflow + EC2 Jumphost etc. | `list(string)` | <pre>[<br> "10.1.1.0/24",<br> "10.1.2.0/24"<br>]</pre> | no |
| <a name="input_public_subnets"></a> [public\_subnets](#input\_public\_subnets) | Public Subnets CIDRs. 62 IPs per Subnet/AZ | `list(string)` | <pre>[<br> "10.1.0.0/26",<br> "10.1.0.64/26"<br>]</pre> | no |
| <a name="input_region"></a> [region](#input\_region) | Region | `string` | `"us-west-2"` | no |
Expand Down
42 changes: 42 additions & 0 deletions analytics/terraform/spark-k8s-operator/addons.tf
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ module "eks_blueprints_addons" {
karpenter_node = {
iam_role_additional_policies = {
AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
S3TableAccess = aws_iam_policy.s3tables_policy.arn
}
}
karpenter = {
Expand Down Expand Up @@ -644,3 +645,44 @@ resource "aws_secretsmanager_secret_version" "grafana" {
secret_id = aws_secretsmanager_secret.grafana.id
secret_string = random_password.grafana.result
}

#---------------------------------------------------------------
# S3Table IAM policy for Karpenter nodes
#---------------------------------------------------------------
resource "aws_iam_policy" "s3tables_policy" {
name_prefix = "${local.name}-s3tables"
path = "/"
description = "S3Tables Metadata access for Nodes"

policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "VisualEditor0"
Effect = "Allow"
Action = [
"s3tables:UpdateTableMetadataLocation",
"s3tables:GetNamespace",
"s3tables:GetTableBucket",
"s3tables:GetTableBucketMaintenanceConfiguration",
"s3tables:GetTableBucketPolicy"
]
Resource = "arn:aws:s3tables:*:${data.aws_caller_identity.current.account_id}:bucket/*"
},
{
Sid = "VisualEditor1"
Effect = "Allow"
Action = [
"s3tables:GetTableMaintenanceJobStatus",
"s3tables:GetTablePolicy",
"s3tables:GetTable",
"s3tables:GetTableMetadataLocation",
"s3tables:UpdateTableMetadataLocation",
"s3tables:GetTableData",
"s3tables:GetTableMaintenanceConfiguration"
]
Resource = "arn:aws:s3tables:*:${data.aws_caller_identity.current.account_id}:bucket/*/table/*"
}
]
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#--------------------------------------------------------------------------------------------
# Dockerfile for Apache Spark 3.3.1 with S3A Support on multi-arch platforms (AMD64 & ARM64)
#--------------------------------------------------------------------------------------------
# Step1: Create a Private or Public ECR repo from AWS Console or CLI
# e.g., aws ecr-public create-repository --repository-name spark --region us-east-1
#---
# Step2: Docker Login:
# aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/<repoAlias>
#---
# Step3: Build multi arch image and push it to ECR:
# docker buildx build --platform linux/amd64,linux/arm64 -t public.ecr.aws/<repoAlias>/spark:3.5.3-scala2.12-java17-python3-ubuntu --push .
#--------------------------------------------------------------------------------------------

# Use the official Spark base image with Java 17 and Python 3, version 3.5.3
FROM apache/spark:3.5.3-scala2.12-java17-python3-ubuntu

# Arguments for version control
ARG HADOOP_VERSION=3.4.1
ARG PREV_HADOOP_VERSION=3.3.4
ARG AWS_SDK_VERSION=2.29.45
ARG ICEBERG_VERSION=1.6.1
ARG S3_TABLES_VERSION=0.1.3
ARG SPARK_UID=185

# Set environment variables
ENV SPARK_HOME=/opt/spark

# Set up as root to install dependencies and tools
USER root

# Remove any old Hadoop libraries to avoid conflicts
RUN rm -f ${SPARK_HOME}/jars/hadoop-client-* && \
rm -f ${SPARK_HOME}/jars/hadoop-yarn-server-web-proxy-*.jar

# Add Hadoop AWS connector and related Hadoop dependencies
RUN cd ${SPARK_HOME}/jars && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O hadoop-aws-${PREV_HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar -O hadoop-client-api-${PREV_HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar -O hadoop-client-runtime-${PREV_HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/${HADOOP_VERSION}/hadoop-common-${HADOOP_VERSION}.jar -O hadoop-common-${PREV_HADOOP_VERSION}.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/${HADOOP_VERSION}/hadoop-yarn-server-web-proxy-${HADOOP_VERSION}.jar -O hadoop-yarn-server-web-proxy-${PREV_HADOOP_VERSION}.jar

# Add Iceberg, AWS SDK bundle, and S3 Tables Catalog for Iceberg runtime
RUN cd ${SPARK_HOME}/jars && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar && \
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar && \
wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/${S3_TABLES_VERSION}/s3-tables-catalog-for-iceberg-runtime-${S3_TABLES_VERSION}.jar

# Set working directory to Spark home
WORKDIR ${SPARK_HOME}

# Switch to non-root user for security best practices
USER ${SPARK_UID}
167 changes: 167 additions & 0 deletions analytics/terraform/spark-k8s-operator/examples/s3-tables/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# S3Table with OSS Spark on EKS Guide

This guide provides step-by-step instructions for setting up and running a Spark job on Amazon EKS using S3Table for data storage.

## Prerequisites

- Latest version of AWS CLI installed (must include S3Tables API support)

## Step 1: Deploy Spark Cluster on EKS

Follow the steps to deploy Spark Cluster on EKS

[Spark Operator on EKS with YuniKorn Scheduler](https://awslabs.github.io/data-on-eks/docs/blueprints/data-analytics/spark-operator-yunikorn#prerequisites)

Once your cluster is up and running, proceed with the following steps to execute a sample Spark job using S3Tables.

## Step 2: Create Test Data for the job

Navigate to the example directory and Generate sample data:

```sh
cd analytics/terraform/spark-k8s-operator/examples/s3-tables
./input-data-gen.sh
```

This will create a file called `employee_data.csv` locally with 100 records. Modify the script to adjust the number of records as needed.

## Step 3: Upload Test Input data to your S3 Bucket

Replace `<YOUR_S3_BUCKET>` with the name of the S3 bucket created by your blueprint and run the below command.

```sh
aws s3 cp employee_data.csv s3://<S3_BUCKET>/s3table-example/input/
```

## Step 4: Upload PySpark Script to S3 Bucket

Replace `<S3_BUCKET>` with the name of the S3 bucket created by your blueprint and run the below command to upload sample Spark job to S3 buckets.

```sh
aws s3 cp s3table-iceberg-pyspark.py s3://<S3_BUCKET>/s3table-example/scripts/
```

## Step 5: Create S3Table

Replace <REGION> and <S3TABLE_BUCKET_NAME> with desired names.

```sh
aws s3tables create-table-bucket \
--region "<REGION>" \
--name "<S3TABLE_BUCKET_NAME>"
```

Make note of the S3TABLE ARN generated by this command.

## Step 6: Update Spark Operator YAML File

- Open `s3table-spark-operator.yaml` file in your preferred text editor.
- Replace `<S3_BUCKET>` with your S3 bucket created by this blueprint(Check Terraform outputs). S3 Bucket where you copied test data and sample spark job in the above steps.
- REPLACE `<S3TABLE_ARN>` with your S3 Table ARN.

## Step 7: Execute Spark Job

Apply the updated YAML file to your Kubernetes cluster to submit the Spark Job.

```sh
cd analytics/terraform/spark-k8s-operator/examples/s3-tables
kubectl apply -f s3table-spark-operator.yaml
```

## Step 8: Verify the Spark Driver log for the output

Check the Spark driver logs to verify job progress and output:

```sh
kubectl logs <spark-driver-pod-name> -n spark-team-a
```

## Step 9: Verify the S3Table using S3Table API

Use the S3Table API to confirm the table was created successfully. Just replace the `<ACCOUNT_ID>` and run the command.

```sh
aws s3tables get-table --table-bucket-arn arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table --namespace doeks_namespace --name employee_s3_table
```

Output looks like below.

```json
{
"name": "employee_s3_table",
"type": "customer",
"tableARN": "arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table/table/55511111-7a03-4513-b921-e372b0030daf",
"namespace": [
"doeks_namespace"
],
"versionToken": "aafc39ddd462690d2a0c",
"metadataLocation": "s3://55511111-7a03-4513-bumiqc8ihp8rnxymuhyz8t1ammu7ausw2b--table-s3/metadata/00004-62cc4be3-59b5-4647-a78d-1cdf69ec5ed8.metadata.json",
"warehouseLocation": "s3://55511111-7a03-4513-bumiqc8ihp8rnxymuhyz8t1ammu7ausw2b--table-s3",
"createdAt": "2025-01-07T22:14:48.689581+00:00",
"createdBy": "<ACCOUNT_ID>",
"modifiedAt": "2025-01-09T00:06:09.222917+00:00",
"ownerAccountId": "<ACCOUNT_ID>",
"format": "ICEBERG"
}
```

Monitor the table maintenance job status:

```sh
aws s3tables get-table-maintenance-job-status --table-bucket-arn arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table --namespace doeks_namespace --name employee_s3_table
```

This command provides information about Iceberg compaction, snapshot management, and unreferenced file removal processes.

```json
{
"tableARN": "arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table/table/55511111-7a03-4513-b921-e372b0030daf",
"status": {
"icebergCompaction": {
"status": "Successful",
"lastRunTimestamp": "2025-01-08T01:18:08.857000+00:00"
},
"icebergSnapshotManagement": {
"status": "Successful",
"lastRunTimestamp": "2025-01-08T22:17:08.811000+00:00"
},
"icebergUnreferencedFileRemoval": {
"status": "Successful",
"lastRunTimestamp": "2025-01-08T22:17:10.377000+00:00"
}
}
}
```

## Step10: Clean up

Delete the table.

```bash
aws s3tables delete-table \
--namespace doeks_namespace \
--table-bucket-arn ${S3TABLE_ARN} \
--name employee_s3_table
```

Delete the namespace.

```bash
aws s3tables delete-namespace \
--namespace doeks_namespace \
--table-bucket-arn ${S3TABLE_ARN}
```

Finally, delete the bucket table

```bash
aws s3tables delete-table-bucket \
--region "<REGION>" \
--table-bucket-arn ${S3TABLE_ARN}
```


# Conclusion
You have successfully set up and run a Spark job on Amazon EKS using S3Table for data storage. This setup provides a scalable and efficient way to process large datasets using Spark on Kubernetes with the added benefits of S3Table's data management capabilities.

For more advanced usage, refer to the official AWS documentation on S3Table and Spark on EKS.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

# Variables
output_file="employee_data.csv"
num_records=100

# Levels array
levels=("Junior" "Mid" "Senior" "Exec")

# Create or overwrite the CSV file with the header
echo "id,name,level,salary" > $output_file

# Generate data
for ((i=1; i<=num_records; i++))
do
# Generate random name
name="Employee_$i"

# Pick a random level
level=${levels[$RANDOM % ${#levels[@]}]}

# Generate a random salary between 50,000 and 200,000
salary=$(echo "scale=2; $RANDOM/32768 * (200000 - 50000) + 50000" | bc)

# Append the data to the CSV file
echo "$i,$name,$level,$salary" >> $output_file
done

# Print a success message
echo "Generated $num_records employee records in $output_file"
Loading

0 comments on commit e3f1a6b

Please sign in to comment.