Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `curl -fsSL https://syftboxdev.openmined.org/install.sh | sh`
- `syftbox init -e khoa@openmined.org`
- `syftbox init`
46 changes: 46 additions & 0 deletions runtimes/fl_tabular.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim

# Install git and other required packages
RUN apt-get update && apt-get install -y \
build-essential \
python3-dev \
libffi-dev \
git \
tmux \
&& rm -rf /var/lib/apt/lists/*

RUN pip install uv

WORKDIR /app

# TODO: remove this copy and install syft-flwr from pip
COPY ./syft-flwr/ ./syft-flwr/
# Install dependencies (will be cached if dependencies don't change)
RUN uv venv && \
. .venv/bin/activate && \
cd syft-flwr && \
uv sync --active && \
uv pip install "scikit-learn==1.6.1" "torch==2.5.1"

# Define environment variables needed by main.py
# You might need to adjust these paths or mount volumes when running the container
ENV DATA_DIR=/app/data
ENV OUTPUT_DIR=/app/output
ENV SYFTBOX_CLIENT_CONFIG_PATH=/app/config.json

# Create the data and output directories
RUN mkdir -p $OUTPUT_DIR $DATA_DIR

# Run main.py when the container launches
# Set the working directory to the fl-tabular project folder
# WORKDIR /app/fl-tabular
# ENTRYPOINT ["./.venv/bin/python", "fl-tabular/main.py"]
# CMD ["--active"]
RUN echo '#!/bin/sh\n\
tmux new-session -d -s rds-server "uv run syft-rds server"\n\
sleep 1\n\
exec ./.venv/bin/python /app/code/main.py --active\n\
' > /app/start.sh && chmod +x /app/start.sh

ENTRYPOINT ["/app/start.sh"]
4 changes: 3 additions & 1 deletion syft-rds/src/syft_rds/client/rds_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_default_config_for_job(self, job: Job) -> JobConfig:
dataset = self.dataset.get(name=job.dataset_name)
runtime = dataset.runtime or self.config.runner_config.runtime
runner_config = self.config.runner_config
return JobConfig(
job_config = JobConfig(
function_folder=user_code.local_dir,
args=[user_code.entrypoint],
data_path=dataset.get_private_path(),
Expand All @@ -150,6 +150,8 @@ def get_default_config_for_job(self, job: Job) -> JobConfig:
timeout=runner_config.timeout,
use_docker=runner_config.use_docker,
)
logger.debug(f"Job config: {job_config}")
return job_config

def run_private(self, job: Job, config: Optional[JobConfig] = None) -> Job:
if job.status == JobStatus.rejected:
Expand Down
2 changes: 2 additions & 0 deletions syft-rds/src/syft_rds/client/rds_clients/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def submit(
name: str | None = None,
description: str | None = None,
tags: list[str] | None = None,
runtime_type: str = "python",
runtime_config: dict | None = None,
) -> Job:
"""`submit` is a convenience method to create both a UserCode and a Job in one call."""
user_code = self.rds.user_code.create(
Expand Down
22 changes: 22 additions & 0 deletions syft-rds/src/syft_rds/syft_runtime/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Literal


class PythonRuntimeConfig(BaseModel):
type: Literal["python"] = "python"
# Add python-specific options if needed, e.g., python_version


class DockerRuntimeConfig(BaseModel):
type: Literal["docker"] = "docker"
image: str | None = None
dockerfile: PathLike | None = None
# Add other docker options, e.g., build_args, context_path

# Add validation if needed, e.g., ensure image or dockerfile is provided


class KubernetesRuntimeConfig(BaseModel):
type: Literal["kubernetes"] = "kubernetes"
image: str
namespace: str | None = None
128 changes: 94 additions & 34 deletions syft-rds/src/syft_rds/syft_runtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from pathlib import Path
from typing import Optional, Protocol, Tuple

from loguru import logger
from pydantic import BaseModel, Field
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.spinner import Spinner

DEFAULT_OUTPUT_DIR = "/output"
DEFAULT_OUTPUT_DIR = "output"


class CodeRuntime(BaseModel):
Expand All @@ -29,6 +30,7 @@ def default(cls):
class JobConfig(BaseModel):
"""Configuration for a job run"""

client_email: str
function_folder: Path
args: list[str]
data_path: Path
Expand All @@ -37,9 +39,12 @@ class JobConfig(BaseModel):
default_factory=lambda: Path("jobs") / datetime.now().strftime("%Y%m%d_%H%M%S")
)
timeout: int = 60
data_mount_dir: str = "/data"
data_mount_dir: str = "/app/data"
use_docker: bool = True
extra_env: dict[str, str] = {}
extra_mounts: list[
dict[str, str]
] = [] # [{"source": Path, "target": Path, "mode": str}]

@property
def job_path(self) -> Path:
Expand Down Expand Up @@ -211,40 +216,54 @@ def validate_paths(self, config: JobConfig) -> None:

def build_docker_command(self, config: JobConfig) -> list[str]:
"""Build the Docker run command with security constraints"""

if not config.use_docker:
# For direct Python execution, build a command that runs Python directly
# Assuming the first arg is the Python script to run
logger.info(
f"Running job without Docker with command: `{' '.join(config.runtime.cmd)} {' '.join(config.args)}`"
)
return [
*config.runtime.cmd,
str(Path(config.function_folder) / config.args[0]),
*config.args[1:],
]
config.output_dir.absolute().mkdir(parents=True, exist_ok=True)

workdir = "/app"

docker_mounts = [
"-v",
f"{Path(config.function_folder).absolute()}:/code:ro",
"-v",
f"{Path(config.data_path).absolute()}:{config.data_mount_dir}:ro",
"-v",
f"{config.output_dir.absolute()}:{DEFAULT_OUTPUT_DIR}:rw",
"--volume",
f"{Path(config.function_folder).absolute()}:{workdir}/code:ro",
"--volume",
f"{Path(config.data_path).absolute()}:{workdir}/data:ro",
"--volume",
f"{config.output_dir.absolute()}:{workdir}/{DEFAULT_OUTPUT_DIR}:rw",
]

if config.runtime.mount_dir:
docker_mounts.extend(
[
"-v",
f"{config.runtime.mount_dir.absolute()}:{config.runtime.mount_dir.absolute()}:ro",
"--volume",
f"{config.runtime.mount_dir.absolute()}:{workdir}/{config.runtime.mount_dir.absolute()}:ro",
]
)

if config.extra_mounts:
for extra_mount in config.extra_mounts:
docker_mounts.extend(
[
"--volume",
f"{extra_mount['source']}:{extra_mount['target']}:{extra_mount['mode']}",
]
)

limits = [
# Security constraints
"--cap-drop",
"ALL", # Drop all capabilities
"--network",
"none", # Disable networking
"--read-only", # Read-only root filesystem
# "--read-only", # Read-only root filesystem
"--tmpfs",
"/tmp:size=16m,noexec,nosuid,nodev", # Secure temp directory
# Resource limits
Expand All @@ -263,12 +282,7 @@ def build_docker_command(self, config: JobConfig) -> list[str]:
]
interpreter = " ".join(config.runtime.cmd)
interpreter_str = f'"{interpreter}"' if " " in interpreter else interpreter
return [
"docker",
"run",
"--rm", # Remove container after completion
*limits,
# Environment variables
docker_env = [
"-e",
f"TIMEOUT={config.timeout}",
"-e",
Expand All @@ -279,11 +293,25 @@ def build_docker_command(self, config: JobConfig) -> list[str]:
f"INTERPRETER={interpreter_str}",
"-e",
f"INPUT_FILE='{config.function_folder / config.args[0]}'",
]

image_name = "syft_python_runtime"

docker_command = [
"docker",
"run",
"--rm",
*limits,
*docker_env,
*config.get_extra_env_as_docker_args(),
*docker_mounts,
"syft_python_runtime",
"--workdir",
workdir,
image_name,
*config.args,
]
logger.debug(f"Running job with Docker command: \n{' '.join(docker_command)}")
return docker_command

def validate_docker(self, config: JobConfig) -> bool:
"""Validate Docker image availability"""
Expand All @@ -309,8 +337,33 @@ def validate_docker(self, config: JobConfig) -> bool:
raise RuntimeError("Docker not installed or not in PATH")

def run(self, config: JobConfig) -> Tuple[Path, int | None]:
"""Run a job in a Docker container or directly as Python"""
"""Run a job in a Docker container or directly as Python
TODO: refactor into 2 functions:
1. run_docker: run a job in a Docker container
2. run_python: run a job directly as Python
"""
# Check Docker availability first if using Docker

# TODO: force run + hard code with Docker. Remove later
config.use_docker = True
client_email = config.client_email
cwd = "/Users/khoaguin/Desktop/projects/OpenMined/syft-rds/experimentals/rpc_docker"
config.extra_mounts = [
{
"source": Path(f"{cwd}/.modified_configs/{client_email}.config.json"),
"target": "/app/config.json",
"mode": "ro",
},
{
"source": Path(
f"{cwd}/SyftBox/datasites/{client_email}/app_data/flwr/rpc/messages"
),
"target": f"/app/SyftBox/datasites/{client_email}/app_data/flwr/rpc/messages",
"mode": "rw",
},
]
# END TODO

if not self.validate_docker(config):
return -1

Expand All @@ -336,22 +389,29 @@ def run(self, config: JobConfig) -> Tuple[Path, int | None]:
text=True,
env=env,
)
# Stream output
while True:
stdout_line = process.stdout.readline()
stderr_line = process.stderr.readline()

for handler in self.handlers:
handler.on_job_progress(stdout_line, stderr_line)
stream_output = False
if stream_output:
# Stream output
while True:
stdout_line = process.stdout.readline()
stderr_line = process.stderr.readline()

if not stdout_line and not stderr_line and process.poll() is not None:
break
for handler in self.handlers:
handler.on_job_progress(stdout_line, stderr_line)

if not stdout_line and not stderr_line:
time.sleep(0.5)
if not stdout_line and not stderr_line and process.poll() is not None:
break

process.wait()
for handler in self.handlers:
handler.on_job_completion(process.returncode)
if not stdout_line and not stderr_line:
time.sleep(0.5)

process.wait()
for handler in self.handlers:
handler.on_job_completion(process.returncode)

return process.returncode

# TODO: save output to file

return process.returncode
return process
Loading