diff --git a/README2.md b/README2.md new file mode 100644 index 0000000..f640e55 --- /dev/null +++ b/README2.md @@ -0,0 +1,3 @@ +- `curl -fsSL https://syftboxdev.openmined.org/install.sh | sh` +- `syftbox init -e khoa@openmined.org` +- `syftbox init` diff --git a/runtimes/fl_tabular.Dockerfile b/runtimes/fl_tabular.Dockerfile new file mode 100644 index 0000000..6ddac72 --- /dev/null +++ b/runtimes/fl_tabular.Dockerfile @@ -0,0 +1,46 @@ +# Use an official Python runtime as a parent image +FROM python:3.11-slim + +# Install git and other required packages +RUN apt-get update && apt-get install -y \ + build-essential \ + python3-dev \ + libffi-dev \ + git \ + tmux \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install uv + +WORKDIR /app + +# TODO: remove this copy and install syft-flwr from pip +COPY ./syft-flwr/ ./syft-flwr/ +# Install dependencies (will be cached if dependencies don't change) +RUN uv venv && \ + . .venv/bin/activate && \ + cd syft-flwr && \ + uv sync --active && \ + uv pip install "scikit-learn==1.6.1" "torch==2.5.1" + +# Define environment variables needed by main.py +# You might need to adjust these paths or mount volumes when running the container +ENV DATA_DIR=/app/data +ENV OUTPUT_DIR=/app/output +ENV SYFTBOX_CLIENT_CONFIG_PATH=/app/config.json + +# Create the data and output directories +RUN mkdir -p $OUTPUT_DIR $DATA_DIR + +# Run main.py when the container launches +# Set the working directory to the fl-tabular project folder +# WORKDIR /app/fl-tabular +# ENTRYPOINT ["./.venv/bin/python", "fl-tabular/main.py"] +# CMD ["--active"] +RUN echo '#!/bin/sh\n\ +tmux new-session -d -s rds-server "uv run syft-rds server"\n\ +sleep 1\n\ +exec ./.venv/bin/python /app/code/main.py --active\n\ +' > /app/start.sh && chmod +x /app/start.sh + +ENTRYPOINT ["/app/start.sh"] \ No newline at end of file diff --git a/syft-rds/src/syft_rds/client/rds_client.py b/syft-rds/src/syft_rds/client/rds_client.py index 7a2941f..894aef7 100644 --- a/syft-rds/src/syft_rds/client/rds_client.py +++ b/syft-rds/src/syft_rds/client/rds_client.py @@ -141,7 +141,7 @@ def get_default_config_for_job(self, job: Job) -> JobConfig: dataset = self.dataset.get(name=job.dataset_name) runtime = dataset.runtime or self.config.runner_config.runtime runner_config = self.config.runner_config - return JobConfig( + job_config = JobConfig( function_folder=user_code.local_dir, args=[user_code.entrypoint], data_path=dataset.get_private_path(), @@ -150,6 +150,8 @@ def get_default_config_for_job(self, job: Job) -> JobConfig: timeout=runner_config.timeout, use_docker=runner_config.use_docker, ) + logger.debug(f"Job config: {job_config}") + return job_config def run_private(self, job: Job, config: Optional[JobConfig] = None) -> Job: if job.status == JobStatus.rejected: diff --git a/syft-rds/src/syft_rds/client/rds_clients/jobs.py b/syft-rds/src/syft_rds/client/rds_clients/jobs.py index 04d12ef..8654d0d 100644 --- a/syft-rds/src/syft_rds/client/rds_clients/jobs.py +++ b/syft-rds/src/syft_rds/client/rds_clients/jobs.py @@ -26,6 +26,8 @@ def submit( name: str | None = None, description: str | None = None, tags: list[str] | None = None, + runtime_type: str = "python", + runtime_config: dict | None = None, ) -> Job: """`submit` is a convenience method to create both a UserCode and a Job in one call.""" user_code = self.rds.user_code.create( diff --git a/syft-rds/src/syft_rds/syft_runtime/config.py b/syft-rds/src/syft_rds/syft_runtime/config.py new file mode 100644 index 0000000..ac9f09c --- /dev/null +++ b/syft-rds/src/syft_rds/syft_runtime/config.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel +from typing import Literal + + +class PythonRuntimeConfig(BaseModel): + type: Literal["python"] = "python" + # Add python-specific options if needed, e.g., python_version + + +class DockerRuntimeConfig(BaseModel): + type: Literal["docker"] = "docker" + image: str | None = None + dockerfile: PathLike | None = None + # Add other docker options, e.g., build_args, context_path + + # Add validation if needed, e.g., ensure image or dockerfile is provided + + +class KubernetesRuntimeConfig(BaseModel): + type: Literal["kubernetes"] = "kubernetes" + image: str + namespace: str | None = None diff --git a/syft-rds/src/syft_rds/syft_runtime/main.py b/syft-rds/src/syft_rds/syft_runtime/main.py index 47b76f2..a561fe3 100644 --- a/syft-rds/src/syft_rds/syft_runtime/main.py +++ b/syft-rds/src/syft_rds/syft_runtime/main.py @@ -5,13 +5,14 @@ from pathlib import Path from typing import Optional, Protocol, Tuple +from loguru import logger from pydantic import BaseModel, Field from rich.console import Console from rich.live import Live from rich.panel import Panel from rich.spinner import Spinner -DEFAULT_OUTPUT_DIR = "/output" +DEFAULT_OUTPUT_DIR = "output" class CodeRuntime(BaseModel): @@ -29,6 +30,7 @@ def default(cls): class JobConfig(BaseModel): """Configuration for a job run""" + client_email: str function_folder: Path args: list[str] data_path: Path @@ -37,9 +39,12 @@ class JobConfig(BaseModel): default_factory=lambda: Path("jobs") / datetime.now().strftime("%Y%m%d_%H%M%S") ) timeout: int = 60 - data_mount_dir: str = "/data" + data_mount_dir: str = "/app/data" use_docker: bool = True extra_env: dict[str, str] = {} + extra_mounts: list[ + dict[str, str] + ] = [] # [{"source": Path, "target": Path, "mode": str}] @property def job_path(self) -> Path: @@ -211,40 +216,54 @@ def validate_paths(self, config: JobConfig) -> None: def build_docker_command(self, config: JobConfig) -> list[str]: """Build the Docker run command with security constraints""" - if not config.use_docker: # For direct Python execution, build a command that runs Python directly # Assuming the first arg is the Python script to run + logger.info( + f"Running job without Docker with command: `{' '.join(config.runtime.cmd)} {' '.join(config.args)}`" + ) return [ *config.runtime.cmd, str(Path(config.function_folder) / config.args[0]), *config.args[1:], ] config.output_dir.absolute().mkdir(parents=True, exist_ok=True) + + workdir = "/app" + docker_mounts = [ - "-v", - f"{Path(config.function_folder).absolute()}:/code:ro", - "-v", - f"{Path(config.data_path).absolute()}:{config.data_mount_dir}:ro", - "-v", - f"{config.output_dir.absolute()}:{DEFAULT_OUTPUT_DIR}:rw", + "--volume", + f"{Path(config.function_folder).absolute()}:{workdir}/code:ro", + "--volume", + f"{Path(config.data_path).absolute()}:{workdir}/data:ro", + "--volume", + f"{config.output_dir.absolute()}:{workdir}/{DEFAULT_OUTPUT_DIR}:rw", ] if config.runtime.mount_dir: docker_mounts.extend( [ - "-v", - f"{config.runtime.mount_dir.absolute()}:{config.runtime.mount_dir.absolute()}:ro", + "--volume", + f"{config.runtime.mount_dir.absolute()}:{workdir}/{config.runtime.mount_dir.absolute()}:ro", ] ) + if config.extra_mounts: + for extra_mount in config.extra_mounts: + docker_mounts.extend( + [ + "--volume", + f"{extra_mount['source']}:{extra_mount['target']}:{extra_mount['mode']}", + ] + ) + limits = [ # Security constraints "--cap-drop", "ALL", # Drop all capabilities "--network", "none", # Disable networking - "--read-only", # Read-only root filesystem + # "--read-only", # Read-only root filesystem "--tmpfs", "/tmp:size=16m,noexec,nosuid,nodev", # Secure temp directory # Resource limits @@ -263,12 +282,7 @@ def build_docker_command(self, config: JobConfig) -> list[str]: ] interpreter = " ".join(config.runtime.cmd) interpreter_str = f'"{interpreter}"' if " " in interpreter else interpreter - return [ - "docker", - "run", - "--rm", # Remove container after completion - *limits, - # Environment variables + docker_env = [ "-e", f"TIMEOUT={config.timeout}", "-e", @@ -279,11 +293,25 @@ def build_docker_command(self, config: JobConfig) -> list[str]: f"INTERPRETER={interpreter_str}", "-e", f"INPUT_FILE='{config.function_folder / config.args[0]}'", + ] + + image_name = "syft_python_runtime" + + docker_command = [ + "docker", + "run", + "--rm", + *limits, + *docker_env, *config.get_extra_env_as_docker_args(), *docker_mounts, - "syft_python_runtime", + "--workdir", + workdir, + image_name, *config.args, ] + logger.debug(f"Running job with Docker command: \n{' '.join(docker_command)}") + return docker_command def validate_docker(self, config: JobConfig) -> bool: """Validate Docker image availability""" @@ -309,8 +337,33 @@ def validate_docker(self, config: JobConfig) -> bool: raise RuntimeError("Docker not installed or not in PATH") def run(self, config: JobConfig) -> Tuple[Path, int | None]: - """Run a job in a Docker container or directly as Python""" + """Run a job in a Docker container or directly as Python + TODO: refactor into 2 functions: + 1. run_docker: run a job in a Docker container + 2. run_python: run a job directly as Python + """ # Check Docker availability first if using Docker + + # TODO: force run + hard code with Docker. Remove later + config.use_docker = True + client_email = config.client_email + cwd = "/Users/khoaguin/Desktop/projects/OpenMined/syft-rds/experimentals/rpc_docker" + config.extra_mounts = [ + { + "source": Path(f"{cwd}/.modified_configs/{client_email}.config.json"), + "target": "/app/config.json", + "mode": "ro", + }, + { + "source": Path( + f"{cwd}/SyftBox/datasites/{client_email}/app_data/flwr/rpc/messages" + ), + "target": f"/app/SyftBox/datasites/{client_email}/app_data/flwr/rpc/messages", + "mode": "rw", + }, + ] + # END TODO + if not self.validate_docker(config): return -1 @@ -336,22 +389,29 @@ def run(self, config: JobConfig) -> Tuple[Path, int | None]: text=True, env=env, ) - # Stream output - while True: - stdout_line = process.stdout.readline() - stderr_line = process.stderr.readline() - for handler in self.handlers: - handler.on_job_progress(stdout_line, stderr_line) + stream_output = False + if stream_output: + # Stream output + while True: + stdout_line = process.stdout.readline() + stderr_line = process.stderr.readline() - if not stdout_line and not stderr_line and process.poll() is not None: - break + for handler in self.handlers: + handler.on_job_progress(stdout_line, stderr_line) - if not stdout_line and not stderr_line: - time.sleep(0.5) + if not stdout_line and not stderr_line and process.poll() is not None: + break - process.wait() - for handler in self.handlers: - handler.on_job_completion(process.returncode) + if not stdout_line and not stderr_line: + time.sleep(0.5) + + process.wait() + for handler in self.handlers: + handler.on_job_completion(process.returncode) + + return process.returncode + + # TODO: save output to file - return process.returncode + return process