Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into ci/docs-deploy
Browse files Browse the repository at this point in the history
Borda authored Feb 8, 2023
2 parents 8ff9274 + 1288e4c commit 97121ee
Showing 29 changed files with 726 additions and 370 deletions.
4 changes: 2 additions & 2 deletions examples/app_installation_commands/app.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ class YourComponent(L.LightningWork):
def run(self):
print(lmdb.version())
print("lmdb successfully installed")
print("accessing a module in a Work or Flow body works!")
print("Accessing a module in a Work or Flow body works!")


class RootFlow(L.LightningFlow):
@@ -23,7 +23,7 @@ def run(self):
self.work.run()


print(f"accessing an object in main code body works!: version={lmdb.version()}")
print(f"Accessing an object in main code body works!: version = {lmdb.version()}")


# run on a cloud machine
6 changes: 3 additions & 3 deletions src/lightning/app/cli/commands/connection.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
from lightning_utilities.core.imports import package_available
from rich.progress import Progress

from lightning.app.utilities.cli_helpers import _LightningAppOpenAPIRetriever
from lightning.app.utilities.cli_helpers import _get_app_display_name, _LightningAppOpenAPIRetriever
from lightning.app.utilities.cloud import _get_project
from lightning.app.utilities.enum import OpenAPITags
from lightning.app.utilities.log import get_logfile
@@ -169,8 +169,8 @@ def connect(app_name_or_id: str):
project = _get_project(client)
apps = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project.project_id)
click.echo(
"We didn't find a matching App. Here are the available Apps that you can"
f"connect to {[app.name for app in apps.lightningapps]}."
"We didn't find a matching App. Here are the available Apps that you can "
f"connect to {[_get_app_display_name(app) for app in apps.lightningapps]}."
)
return

9 changes: 5 additions & 4 deletions src/lightning/app/cli/commands/logs.py
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@
@click.argument("components", nargs=-1, required=False)
@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.")
def logs(app_name: str, components: List[str], follow: bool) -> None:
"""Show cloud application logs. By default prints logs for all currently available components.
"""Show cloud application logs. By default, prints logs for all currently available components.
Example uses:
@@ -57,7 +57,7 @@ def _show_logs(app_name: str, components: List[str], follow: bool) -> None:
project = _get_project(client)

apps = {
app.name: app
getattr(app, "display_name", None) or app.name: app
for app in client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id
).lightningapps
@@ -70,12 +70,13 @@ def _show_logs(app_name: str, components: List[str], follow: bool) -> None:

if not app_name:
raise click.ClickException(
f"You have not specified any Lightning App. Please select one of available: [{', '.join(apps.keys())}]"
f"You have not specified any Lightning App. Please select one of the following: [{', '.join(apps.keys())}]."
)

if app_name not in apps:
raise click.ClickException(
f"The Lightning App '{app_name}' does not exist. Please select one of following: [{', '.join(apps.keys())}]"
f"The Lightning App '{app_name}' does not exist. "
f"Please select one of the following: [{', '.join(apps.keys())}]."
)

# Fetch all lightning works from given application
17 changes: 17 additions & 0 deletions src/lightning/app/core/flow.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

from deepdiff import DeepHash

from lightning.app.core.plugin import Plugin
from lightning.app.core.work import LightningWork
from lightning.app.frontend import Frontend
from lightning.app.storage import Path
@@ -740,6 +741,22 @@ def configure_api(self):
"""
raise NotImplementedError

def configure_plugins(self) -> Optional[List[Dict[str, Plugin]]]:
"""Configure the plugins of this LightningFlow.
Returns a list of dictionaries mapping a plugin name to a :class:`lightning_app.core.plugin.Plugin`.
.. code-block:: python
class Flow(LightningFlow):
def __init__(self):
super().__init__()
def configure_plugins(self):
return [{"my_plugin_name": MyPlugin()}]
"""
pass

def state_dict(self):
"""Returns the current flow state but not its children."""
return {
170 changes: 170 additions & 0 deletions src/lightning/app/core/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Copyright The Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional

import requests
import uvicorn
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

from lightning.app.utilities.app_helpers import Logger
from lightning.app.utilities.cloud import _get_project
from lightning.app.utilities.component import _set_flow_context
from lightning.app.utilities.enum import AppStage
from lightning.app.utilities.network import LightningClient

logger = Logger(__name__)


class Plugin:
"""A ``Plugin`` is a single-file Python class that can be executed within a cloudspace to perform actions."""

def __init__(self) -> None:
self.app_url = None

def run(self, name: str, entrypoint: str) -> None:
"""Override with the logic to execute on the client side."""

def run_app_command(self, command_name: str, config: Optional[BaseModel] = None) -> Dict[str, Any]:
"""Run a command on the app associated with this plugin.
Args:
command_name: The name of the command to run.
config: The command config or ``None`` if the command doesn't require configuration.
"""
if self.app_url is None:
raise RuntimeError("The plugin must be set up before `run_app_command` can be called.")

command = command_name.replace(" ", "_")
resp = requests.post(self.app_url + f"/command/{command}", data=config.json() if config else None)
if resp.status_code != 200:
try:
detail = str(resp.json())
except Exception:
detail = "Internal Server Error"
raise RuntimeError(f"Failed with status code {resp.status_code}. Detail: {detail}")

return resp.json()

def _setup(self, app_id: str) -> None:
client = LightningClient()
project_id = _get_project(client).project_id
response = client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project_id, app_id=app_id
)
if len(response.lightningapps) > 1:
raise RuntimeError(f"Found multiple apps with ID: {app_id}")
if len(response.lightningapps) == 0:
raise RuntimeError(f"Found no apps with ID: {app_id}")
self.app_url = response.lightningapps[0].status.url


class _Run(BaseModel):
plugin_name: str
project_id: str
cloudspace_id: str
name: str
entrypoint: str
cluster_id: Optional[str] = None
app_id: Optional[str] = None


def _run_plugin(run: _Run) -> None:
"""Create a run with the given name and entrypoint under the cloudspace with the given ID."""
if run.app_id is None and run.plugin_name == "app":
from lightning.app.runners.cloud import CloudRuntime

# TODO: App dispatch should be a plugin
# Dispatch the run
_set_flow_context()

entrypoint_file = Path("/content") / run.entrypoint

app = CloudRuntime.load_app_from_file(str(entrypoint_file.resolve().absolute()))

app.stage = AppStage.BLOCKING

runtime = CloudRuntime(
app=app,
entrypoint=entrypoint_file,
start_server=True,
env_vars={},
secrets={},
run_app_comment_commands=True,
)
# Used to indicate Lightning has been dispatched
os.environ["LIGHTNING_DISPATCHED"] = "1"

try:
runtime.cloudspace_dispatch(
project_id=run.project_id,
cloudspace_id=run.cloudspace_id,
name=run.name,
cluster_id=run.cluster_id,
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
elif run.app_id is not None:
from lightning.app.utilities.cli_helpers import _LightningAppOpenAPIRetriever
from lightning.app.utilities.commands.base import _download_command

retriever = _LightningAppOpenAPIRetriever(run.app_id)

metadata = retriever.api_commands[run.plugin_name] # type: ignore

with tempfile.TemporaryDirectory() as tmpdir:

target_file = os.path.join(tmpdir, f"{run.plugin_name}.py")
plugin = _download_command(
run.plugin_name,
metadata["cls_path"],
metadata["cls_name"],
run.app_id,
target_file=target_file,
)

if isinstance(plugin, Plugin):
plugin._setup(app_id=run.app_id)
plugin.run(run.name, run.entrypoint)
else:
# This should never be possible but we check just in case
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"The plugin {run.plugin_name} is an incorrect type.",
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="App ID must be specified unless `plugin_name='app'`."
)


def _start_plugin_server(host: str, port: int) -> None:
"""Start the plugin server which can be used to dispatch apps or run plugins."""
fastapi_service = FastAPI()

fastapi_service.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

fastapi_service.post("/v1/runs")(_run_plugin)

uvicorn.run(app=fastapi_service, host=host, port=port, log_level="error")
72 changes: 70 additions & 2 deletions src/lightning/app/runners/cloud.py
Original file line number Diff line number Diff line change
@@ -208,6 +208,67 @@ def open(self, name: str, cluster_id: Optional[str] = None):
logger.error(e.body)
sys.exit(1)

def cloudspace_dispatch(
self,
project_id: str,
cloudspace_id: str,
name: str,
cluster_id: str = None,
):
"""Slim dispatch for creating runs from a cloudspace. This dispatch avoids resolution of some properties
such as the project and cluster IDs that are instead passed directly.
Args:
project_id: The ID of the project.
cloudspace_id: The ID of the cloudspace.
name: The name for the run.
cluster_id: The ID of the cluster to run on.
Raises:
ApiException: If there was an issue in the backend.
RuntimeError: If there are validation errors.
ValueError: If there are validation errors.
"""
# Dispatch in four phases: resolution, validation, spec creation, API transactions
# Resolution
root = self._resolve_root()
ignore_functions = self._resolve_open_ignore_functions()
repo = self._resolve_repo(root, ignore_functions)
cloudspace = self._resolve_cloudspace(project_id, cloudspace_id)
cluster_id = self._resolve_cluster_id(cluster_id, project_id, [cloudspace])
queue_server_type = self._resolve_queue_server_type()

self.app._update_index_file()

# Validation
# TODO: Validate repo and surface to the user
# self._validate_repo(root, repo)
self._validate_work_build_specs_and_compute()
self._validate_drives()
self._validate_mounts()

# Spec creation
flow_servers = self._get_flow_servers()
network_configs = self._get_network_configs(flow_servers)
works = self._get_works()
run_body = self._get_run_body(cluster_id, flow_servers, network_configs, works, False, root, True)
env_vars = self._get_env_vars(self.env_vars, self.secrets, self.run_app_comment_commands)

# API transactions
run = self._api_create_run(project_id, cloudspace_id, run_body)
self._api_package_and_upload_repo(repo, run)

self._api_create_run_instance(
cluster_id,
project_id,
name,
cloudspace_id,
run.id,
V1LightningappInstanceState.RUNNING,
queue_server_type,
env_vars,
)

def dispatch(
self,
name: str = "",
@@ -410,6 +471,13 @@ def _resolve_project(self) -> V1Membership:
"""Determine the project to run on, choosing a default if multiple projects are found."""
return _get_project(self.backend.client)

def _resolve_cloudspace(self, project_id: str, cloudspace_id: str) -> V1CloudSpace:
"""Get a cloudspace by project / cloudspace ID."""
return self.backend.client.cloud_space_service_get_cloud_space(
project_id=project_id,
id=cloudspace_id,
)

def _resolve_existing_cloudspaces(self, project, cloudspace_name: str) -> List[V1CloudSpace]:
"""Lists all the cloudspaces with a name matching the provided cloudspace name."""
# TODO: Add pagination, otherwise this could break if users have a lot of cloudspaces.
@@ -871,7 +939,7 @@ def _api_create_run_instance(
self,
cluster_id: str,
project_id: str,
cloudspace_name: str,
run_name: str,
cloudspace_id: str,
run_id: str,
desired_state: V1LightningappInstanceState,
@@ -886,7 +954,7 @@ def _api_create_run_instance(
id=run_id,
body=IdGetBody1(
cluster_id=cluster_id,
name=cloudspace_name,
name=run_name,
desired_state=desired_state,
queue_server_type=queue_server_type,
env=env_vars,
Loading

0 comments on commit 97121ee

Please sign in to comment.