-
Notifications
You must be signed in to change notification settings - Fork 8
WIP Key value parquet to dynamo #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Coderabbit cleanup
…o a dataframe for each date. Converting types currently takes ~10 minutes.
…ay it is only slow once. Also, reduce number of samples to 10000 from 20000 to save 5 minutes.
WalkthroughThe changes in this pull request involve modifications to the Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
- docker-init/compose.yaml (2 hunks)
- docker-init/generate_anomalous_data.py (3 hunks)
- docker-init/requirements.txt (1 hunks)
🧰 Additional context used
🔇 Additional comments (3)
docker-init/requirements.txt (1)
Line range hint
1-9: LGTM, pending resolution of the botocore dependency.The requirements file looks good overall, with appropriately pinned versions for all dependencies. Please address the concern regarding the botocore dependency as mentioned in the previous comment. Once that's resolved, these changes can be approved.
docker-init/compose.yaml (2)
52-52: Explicit dependency on DynamoDB service: ApprovedAdding an explicit dependency on the 'dynamo' service is a good practice. This ensures that the 'app' service won't start until DynamoDB is ready, potentially preventing startup issues or race conditions.
54-54: Environment variable renamed: Approved with verification neededRenaming 'DYNAMO_ENDPOINT' to 'AWS_ENDPOINT_URL_DYNAMODB' is a good change as it aligns with AWS SDK naming conventions. This standardization can improve code readability and maintainability.
Please ensure that this change is reflected in all relevant application code and documentation. Run the following script to check for any remaining occurrences of the old variable name:
| - "./docker/dynamodb:/home/dynamodblocal/data" | ||
| working_dir: /home/dynamodblocal | ||
| user: dynamodblocal | ||
| user: root |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security concern: DynamoDB service running as root
Changing the user context to root for the DynamoDB service poses significant security risks. Running containers with root privileges can potentially allow unrestricted access to the host system if the container is compromised.
Consider the following alternatives:
- Identify the specific permissions needed and create a custom user with only those permissions.
- Use the least privileged user that still allows the service to function correctly.
- If file permission issues are the reason for this change, consider modifying the permissions of the mounted volumes instead.
Example:
dynamo:
user: dynamodb
volumes:
- "./docker/dynamodb:/home/dynamodblocal/data:rw"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DynamoDB Local has permission issues if it doesn't run as root. I haven't been able to identify a way to modify the permissions of the mounted volume to allow the user dynamodblocal to write to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chewy-zlai, I understand that you're encountering permission issues with DynamoDB Local when it doesn't run as root. A common solution is to adjust the permissions of the mounted volume so that the dynamodblocal user inside the container has the necessary write access.
Here are some steps you could try:
-
Determine the UID and GID of the
dynamodblocaluser inside the Docker image:Run the following commands to find the user and group IDs:
docker run --rm amazon/dynamodb-local:2.5.2 id -u dynamodblocal docker run --rm amazon/dynamodb-local:2.5.2 id -g dynamodblocal
Suppose the output is:
1000 1000This means the
dynamodblocaluser has a UID and GID of1000. -
Adjust the ownership of the mounted directory on the host:
Change the ownership of
./docker/dynamodbto match the UID and GID of thedynamodblocaluser:sudo chown -R 1000:1000 ./docker/dynamodb
-
Set appropriate permissions on the directory:
Ensure that the directory has the necessary read/write permissions:
chmod -R u+rwx ./docker/dynamodb
-
Keep the user as
dynamodblocalin your Docker Compose file:user: dynamodblocal
By aligning the ownership and permissions, the dynamodblocal user inside the container should be able to write to the mounted volume without requiring root privileges. This approach enhances security by avoiding running the container as root.
Let me know if this helps or if you need further assistance.
✏️ Learnings added
Learnt from: chewy-zlai
PR: zipline-ai/chronon#46
File: docker-init/compose.yaml:12-12
Timestamp: 2024-10-15T19:03:27.898Z
Learning: DynamoDB Local may have permission issues when not running as root due to write permissions on the mounted volume. Adjusting the host directory's ownership and permissions can allow the `dynamodblocal` user inside the container to access it without running as root.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That command didn't work:
"$ docker run --rm amazon/dynamodb-local:2.5.2 id -u dynamodblocal
Unable to find image 'amazon/dynamodb-local:2.5.2' locally
2.5.2: Pulling from amazon/dynamodb-local
25f617ca51f7: Pull complete
88169dbe8767: Pull complete
4f4fb700ef54: Pull complete
b510243a5cd9: Pull complete
Digest: sha256:d7ebddeb60fa418bcda218a6c6a402a58441b2a20d54c9cb1d85fd5194341753
Status: Downloaded newer image for amazon/dynamodb-local:2.5.2
Error: Could not find or load main class id
Caused by: java.lang.ClassNotFoundException: id
"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chewy-zlai, thank you for pointing that out. The issue occurs because the amazon/dynamodb-local:2.5.2 image has a default entrypoint that runs the DynamoDB application, so when we try to execute id, it doesn't recognize the command.
To resolve this, we need to override the entrypoint to allow us to run shell commands inside the container. Please try the following commands:
docker run --rm --entrypoint "" amazon/dynamodb-local:2.5.2 id -u dynamodblocal
docker run --rm --entrypoint "" amazon/dynamodb-local:2.5.2 id -g dynamodblocalThese commands override the default entrypoint, enabling you to execute the id command directly. This should output the UID and GID of the dynamodblocal user.
Once you have the UID and GID, you can adjust the ownership and permissions of the mounted directory on your host machine:
sudo chown -R [UID]:[GID] ./docker/dynamodb
chmod -R u+rwx ./docker/dynamodbReplace [UID] and [GID] with the numbers obtained from the previous commands.
Let me know if this helps or if you have any questions!
✏️ Learnings added
Learnt from: chewy-zlai
PR: zipline-ai/chronon#46
File: docker-init/compose.yaml:12-12
Timestamp: 2024-10-15T19:24:11.223Z
Learning: When executing shell commands inside the `amazon/dynamodb-local` Docker image, you need to override the entrypoint using `--entrypoint ""` to prevent it from running the default Java application.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: chewy-zlai
PR: zipline-ai/chronon#46
File: docker-init/compose.yaml:12-12
Timestamp: 2024-10-15T19:03:28.155Z
Learning: DynamoDB Local may have permission issues when not running as root due to write permissions on the mounted volume. Adjusting the host directory's ownership and permissions can allow the `dynamodblocal` user inside the container to access it without running as root.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to change the owner in the Dockerfile, so that the change will persist on other host machines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chewy-zlai, yes, you can modify the Dockerfile to change the ownership of directories, which can help resolve permission issues across different host machines. By adjusting the ownership inside the Docker image, you ensure that the dynamodblocal user within the container has the necessary permissions, without requiring manual changes on the host system.
Here's how you can do it:
-
Create a custom Dockerfile for your DynamoDB Local service:
# Use the base image from Amazon FROM amazon/dynamodb-local:2.5.2 # Switch to root user to change ownership USER root # Create the data directory and adjust ownership and permissions RUN mkdir -p /home/dynamodblocal/data && \ chown -R dynamodblocal:dynamodblocal /home/dynamodblocal/data # Switch back to the dynamodblocal user USER dynamodblocal # Set the working directory WORKDIR /home/dynamodblocal # Expose the default DynamoDB Local port EXPOSE 8000 # Set the default command to run DynamoDB Local CMD ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-dbPath", "./data"]
This Dockerfile does the following:
- Uses the official DynamoDB Local image as the base.
- Switches to the root user to modify ownership.
- Creates the
/home/dynamodblocal/datadirectory and changes its owner todynamodblocal. - Switches back to the
dynamodblocaluser for running the application.
-
Update your
docker-compose.yamlto build and use this custom image:services: dynamo: build: context: ./path_to_dynamo_dockerfile dockerfile: Dockerfile container_name: dynamodb-local ports: - "8000:8000" volumes: - "./docker/dynamodb:/home/dynamodblocal/data" working_dir: /home/dynamodblocal user: dynamodblocal
Make sure to replace
./path_to_dynamo_dockerfilewith the actual relative path to your custom Dockerfile.This setup ensures that:
- The
dynamodblocaluser inside the container has ownership of the/home/dynamodblocal/datadirectory. - The service runs as
dynamodblocal, enhancing security by not running as root. - Other developers or environments using your Docker configuration won't need to manually adjust permissions on their host machines.
- The
Note on Mounted Volumes:
When you mount a host directory into a container, the permissions and ownership are governed by the host's filesystem. Changing ownership inside the Dockerfile affects files and directories within the image and the container's filesystem, but not the host-mounted volumes.
To mitigate permission issues with mounted volumes:
-
Use Named Volumes: Instead of mounting a host directory, use a Docker named volume, which Docker manages internally.
services: dynamo: build: context: ./path_to_dynamo_dockerfile dockerfile: Dockerfile container_name: dynamodb-local ports: - "8000:8000" volumes: - dynamodb_data:/home/dynamodblocal/data working_dir: /home/dynamodblocal user: dynamodblocal volumes: dynamodb_data:
This approach avoids host filesystem permission issues since Docker handles the volume's filesystem.
-
Adjust Permissions on Host Directory: If you need to use a host directory, you might still need to adjust its permissions to ensure the container user can access it. However, by setting the ownership in the Dockerfile and ensuring the container runs as
dynamodblocal, the need for manual host adjustments is minimized.
Let me know if this solution works for you or if you have any further questions!
🧠 Learnings used
Learnt from: chewy-zlai
PR: zipline-ai/chronon#46
File: docker-init/compose.yaml:12-12
Timestamp: 2024-10-15T19:03:28.155Z
Learning: DynamoDB Local may have permission issues when not running as root due to write permissions on the mounted volume. Adjusting the host directory's ownership and permissions can allow the `dynamodblocal` user inside the container to access it without running as root.
piyush-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chewy for this one, I think we need to ingest a df with the format:
| private val baseRowSchema = StructType( |
We could skip the key_json and value_json fields as they're typically used when we are exploring the data using a query engine like Trino and and to see the values without having to crack open the avro.
One of the steps in either @nikhil-zlai 's Drift job PR or a subsequent one will be to extend the Drift job to write out this key_bytes, value_bytes shape of the data. See this example for an existing job that does it - https://github.com/zipline-ai/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala#L233
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (2)
docker-init/Dockerfile (1)
Line range hint
6-11: Consider updating Python and numpy versions.While specifying Python 3.8 for numpy 1.22.0 due to security concerns is a good practice, there are a few points to consider:
- Python 3.8 will reach end-of-life in October 2024. It might be beneficial to use a more recent version of Python.
- The security concern with numpy might be outdated. Newer versions of numpy might have addressed these issues and provide better security.
- Pinning to a specific numpy version might conflict with other dependencies.
Consider the following actions:
- Upgrade to a more recent Python version (e.g., 3.9 or 3.10) if possible.
- Review the current numpy security status and upgrade if a newer secure version is available.
- Use a dependency management tool like
pip-compileorpoetryto manage package versions and their interdependencies more effectively.To help verify the current numpy version and check for updates, please run:
#!/bin/bash # Description: Check numpy version and available updates # Test 1: Display current numpy version in requirements.txt echo "Current numpy version:" grep numpy docker-init/requirements.txt # Test 2: Check for the latest numpy version echo -e "\nLatest numpy version:" pip3.8 index versions numpy | grep numpydocker-init/parquet_to_dynamo.py (1)
72-76: Consider adjusting the batch size for optimal performanceThe current batch size is set to 1000. Depending on the size of your data and network conditions, you might experience improved performance with a different batch size.
Experiment with different batch sizes to find the optimal setting. Additionally, you may consider implementing error handling within the loop to retry failed batches.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- docker-init/Dockerfile (1 hunks)
- docker-init/parquet_to_dynamo.py (1 hunks)
🧰 Additional context used
🪛 Ruff
docker-init/parquet_to_dynamo.py
4-4:
decimal.Decimalimported but unusedRemove unused import:
decimal.Decimal(F401)
5-5:
jsonimported but unusedRemove unused import:
json(F401)
7-7:
pandasimported but unusedRemove unused import:
pandas(F401)
🔇 Additional comments (1)
docker-init/Dockerfile (1)
Line range hint
1-35: Overall, the Dockerfile changes look good, with some areas for improvement.The modifications to the Dockerfile align well with the PR objectives and enhance the build and runtime environment. The multi-stage build process is maintained, and the addition of the new
parquet_to_dynamo.pyscript supports the goal of converting key-value pairs from Parquet to DynamoDB.Key points to address:
- Clarify the integration and execution of the new
parquet_to_dynamo.pyscript.- Consider updating Python and numpy versions for better long-term maintainability and security.
- Ensure all dependencies are properly managed and compatible with the chosen Python version.
To ensure all necessary files are present and permissions are set correctly, please run:
#!/bin/bash # Description: Verify presence and permissions of key files # Test 1: Check if all mentioned files exist echo "Checking for required files:" for file in docker-init/requirements.txt docker-init/generate_anomalous_data.py docker-init/parquet_to_dynamo.py docker-init/start.sh do if [ -f "$file" ]; then echo "$file exists" else echo "$file is missing" fi done # Test 2: Verify start.sh permissions echo -e "\nChecking start.sh permissions:" ls -l docker-init/start.sh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
docker-init/parquet_to_dynamo.py (2)
28-69: Adjust the success message for table creationThe current implementation prints "Created DynamoDB Table" even when the table already exists. This can be misleading. Consider moving the success message inside the try block and adding a more general message outside:
try: dynamodb.create_table( # ... (table creation parameters) ) print("Created DynamoDB Table") except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceInUseException': print(f"Table {table_name} already exists. Skipping creation.") else: raise error print(f"DynamoDB Table {table_name} is ready for use.")This change ensures that the "Created" message is only printed when a new table is actually created.
82-85: Enhance completion message and remove extra lines
Consider providing more information in the completion message, such as the number of records processed:
print(f"Completed writing {simplified_df.count()} records from Parquet to DynamoDB table {table_name}")Remove the extra empty lines at the end of the file. Generally, it's good practice to have a single empty line at the end of a Python file.
These changes will make the script output more informative and adhere to common Python style guidelines.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- docker-init/parquet_to_dynamo.py (1 hunks)
🧰 Additional context used
🪛 Ruff
docker-init/parquet_to_dynamo.py
4-4:
decimal.Decimalimported but unusedRemove unused import:
decimal.Decimal(F401)
5-5:
jsonimported but unusedRemove unused import:
json(F401)
7-7:
pandasimported but unusedRemove unused import:
pandas(F401)
10-10:
pyspark.sql.functions.concatimported but unusedRemove unused import:
pyspark.sql.functions.concat(F401)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
docker-init/parquet_to_dynamo.py (5)
1-11: Remove unused import to improve code clarityThe
concatfunction is imported frompyspark.sql.functionsbut is not used in the code. Remove this unused import to enhance code readability.Apply this diff to remove the unused import:
-from pyspark.sql.functions import concat, encode, struct, to_json +from pyspark.sql.functions import encode, struct, to_json🧰 Tools
🪛 Ruff
7-7:
pyspark.sql.functions.concatimported but unusedRemove unused import:
pyspark.sql.functions.concat(F401)
13-18: LGTM! Consider adding error handling for Parquet file readingThe Parquet file processing and DataFrame transformations look good. The use of environment variables for the Parquet folder path is a good practice.
Consider adding error handling for the Parquet file reading operation to gracefully handle cases where the files might not exist or are inaccessible:
try: parquet_files = spark.read.parquet(os.path.join(os.environ['PARQUET_FOLDER'], "*.parquet")) except Exception as e: print(f"Error reading Parquet files: {str(e)}") sys.exit(1)
20-60: Move success message inside try block for accuracyThe DynamoDB table creation logic and error handling look good. However, the success message for table creation is outside the try block, which could be misleading if an error occurs.
Move the success message inside the try block to ensure it's only printed when the table is actually created:
try: dynamodb.create_table( # ... (table creation parameters) ) + print("Created DynamoDB Table") except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceInUseException': # Table already exists print("Table " + table_name + " already exists. Skipping") else: raise error -print("Created DynamoDB Table")
64-75: Consider optimizing for large datasets, but overall approach is goodThe batch upload process to DynamoDB and the error handling look good. The progress printing is a nice touch for monitoring the upload process.
For large datasets, converting the entire Spark DataFrame to a Pandas DataFrame might be inefficient. Consider using Spark's
foreachPartitionmethod to process and upload data in parallel:def upload_partition(partition): for row in partition: try: wr.dynamodb.put_item(item=row.asDict(), table_name=table_name) except Exception as e: print(f"Error uploading item: {str(e)}", flush=True) simplified_df.foreachPartition(upload_partition)This approach avoids loading the entire dataset into memory and can be more efficient for large datasets.
77-80: Enhance completion message with operation summaryThe completion message is clear, but it could be more informative.
Consider adding more detailed information about the operation, such as the number of records processed and the total time taken:
import time start_time = time.time() # ... (rest of the code) total_records = simplified_df.count() end_time = time.time() duration = end_time - start_time print(f"Wrote {total_records} records from Parquet to Dynamo in {duration:.2f} seconds")This provides more context about the operation's scale and performance.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- docker-init/parquet_to_dynamo.py (1 hunks)
🧰 Additional context used
🪛 Ruff
docker-init/parquet_to_dynamo.py
7-7:
pyspark.sql.functions.concatimported but unusedRemove unused import:
pyspark.sql.functions.concat(F401)
…ifies down to just renaming the ts column timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
docker-init/parquet_to_dynamo.py (2)
1-11: Remove unused imports to improve code clarityThe following imports are not used in the code and should be removed:
botocore(line 3)concat,encode,struct,to_jsonfrompyspark.sql.functions(line 7)Removing these unused imports will enhance code readability and maintainability.
Apply this diff to remove the unused imports:
import awswrangler as wr import boto3 -import botocore import os from pyspark.sql import SparkSession -from pyspark.sql.functions import concat, encode, struct, to_json🧰 Tools
🪛 Ruff
3-3:
botocoreimported but unusedRemove unused import:
botocore(F401)
7-7:
pyspark.sql.functions.concatimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.encodeimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.structimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.to_jsonimported but unusedRemove unused import
(F401)
35-35: Enhance the completion message with more detailsThe current completion message is concise but could be more informative. Consider adding details such as the number of records processed or the time taken for the operation.
You could modify the print statement as follows:
print(f"Wrote {len(panda_df)} records from Parquet to DynamoDB table {table_name}")This provides more context about the operation that was completed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- docker-init/parquet_to_dynamo.py (1 hunks)
🧰 Additional context used
🪛 Ruff
docker-init/parquet_to_dynamo.py
3-3:
botocoreimported but unusedRemove unused import:
botocore(F401)
7-7:
pyspark.sql.functions.concatimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.encodeimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.structimported but unusedRemove unused import
(F401)
7-7:
pyspark.sql.functions.to_jsonimported but unusedRemove unused import
(F401)
🔇 Additional comments (1)
docker-init/parquet_to_dynamo.py (1)
13-15: LGTM: Parquet file reading and initial processingThe code for reading Parquet files and renaming the "ts" column to "timestamp" is well-implemented. This ensures consistency in column naming, which is a good practice.
| dynamodb = boto3.client('dynamodb') | ||
| table_name = "test-join_drift_batch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider making the table name configurable
The table name "test-join_drift_batch" is currently hardcoded. To improve maintainability and flexibility, consider making it configurable through an environment variable or a configuration file.
You could modify the code as follows:
table_name = os.environ.get('DYNAMODB_TABLE_NAME', 'test-join_drift_batch')This allows you to easily change the table name without modifying the code, while still providing a default value.
| panda_df = parquet_files.toPandas() | ||
|
|
||
| # Upload data in batches | ||
| batch_size = 1000 # Adjust based on your needs | ||
| for i in range(0, len(panda_df), batch_size): | ||
| batch = panda_df.iloc[i:i+batch_size] | ||
| try: | ||
| wr.dynamodb.put_df(df=batch, table_name=table_name) | ||
| print(f"Uploaded batch {i//batch_size + 1}/{len(panda_df)//batch_size + 1}", flush=True) | ||
| except Exception as e: | ||
| print(f"Error uploading batch {i + 1}: {str(e)}", flush=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using Spark DataFrame for the entire process
Currently, the code converts the Spark DataFrame to a Pandas DataFrame before uploading to DynamoDB. This conversion might be unnecessary and could be inefficient for large datasets. Consider using the Spark DataFrame throughout the process for better performance and scalability.
You could modify the code to use Spark's foreachPartition method:
def upload_partition(partition):
for row in partition:
try:
wr.dynamodb.put_df(df=pd.DataFrame([row.asDict()]), table_name=table_name)
except Exception as e:
print(f"Error uploading row: {str(e)}", flush=True)
parquet_files.foreachPartition(upload_partition)This approach processes the data in Spark partitions, which can be more efficient for large datasets.
Summary
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Refactor