Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use multiple GPU from a node #411

Open
udupicloud opened this issue Nov 4, 2024 · 10 comments
Open

How to use multiple GPU from a node #411

udupicloud opened this issue Nov 4, 2024 · 10 comments

Comments

@udupicloud
Copy link

udupicloud commented Nov 4, 2024

Screenshot 2024-11-04 at 1 27 25 PM Hi, Screenshot 2024-11-04 at 1 22 12 PM I have a multi node setup with multiple GPU. I was able to get the cluster but I don't see the remaining GPU's from each nodes. How do I do that. Also observed below error while using llama 3.2 and Mistral large. Screenshot 2024-11-04 at 1 21 35 PM

I am running the setup on Ubuntu 22.04 with python 3.12 and all the Nvidia drivers including the coda 12.4 has been installed. Installed llama and models 3.2. each ML machine has 2 (RTX 3070 and 3090) GPU's

@jorge123255
Copy link

i had to configure the integration.py and the integration_engine.py to detect two gpus. Terflow detects them now but trying to figure out how to have it displayed when you run exo.

@udupicloud
Copy link
Author

Hi @jorge123255 - Could you please help me with the steps to fix and what needs to be added or edited. I tried to find the files by the way, but couldn't find those files too.

Thank you,
Mark.

@jorge123255
Copy link

/exo/interence

inference_engine.py
import torch # Assuming PyTorch or similar GPU access library is available
import numpy as np
import os
from exo.helpers import DEBUG # Make sure to import DEBUG
from typing import Tuple, Optional, List
from abc import ABC, abstractmethod
from .shard import Shard

class InferenceEngine(ABC):
@AbstractMethod
async def infer_prompt(self, request_id: str, shard: Shard, prompt: str, image_str: Optional[str] = None, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
pass

@abstractmethod
async def infer_tensor(self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    pass

def get_available_gpus() -> List[int]:
"""Returns a list of available GPU indices."""
if torch.cuda.is_available():
return list(range(torch.cuda.device_count()))
else:
print("Warning: No GPUs detected. Running on CPU.")
return []

def get_inference_engine(inference_engine_name: str, shard_downloader: 'ShardDownloader'):
if DEBUG >= 2:
print(f"get_inference_engine called with: {inference_engine_name}")

available_gpus = get_available_gpus()
if DEBUG >= 1:
    print(f"Detected GPUs: {available_gpus}")

if inference_engine_name == "mlx":
    from exo.inference.mlx.sharded_inference_engine import MLXDynamicShardInferenceEngine
    return MLXDynamicShardInferenceEngine(shard_downloader, devices=available_gpus)

elif inference_engine_name == "tinygrad":
    from exo.inference.tinygrad.inference import TinygradDynamicShardInferenceEngine
    import tinygrad.helpers
    tinygrad.helpers.DEBUG.value = int(os.getenv("TINYGRAD_DEBUG", default="0"))
    
    return TinygradDynamicShardInferenceEngine(shard_downloader, devices=available_gpus)

elif inference_engine_name == "dummy":
    from exo.inference.dummy_inference_engine import DummyInferenceEngine
    return DummyInferenceEngine()

raise ValueError(f"Unsupported inference engine: {inference_engine_name}")

and under /exo/tinygrad inference.py

from pathlib import Path
import json
import os
from exo.inference.tinygrad.models.llama import Transformer, convert_from_huggingface, fix_bf16
from exo.inference.shard import Shard
from exo.inference.tokenizers import resolve_tokenizer
from tinygrad.nn.state import load_state_dict
from tinygrad import Tensor, nn, Context
from exo.inference.inference_engine import InferenceEngine
from typing import Optional, Tuple
import numpy as np
from exo.inference.tinygrad.tinygrad_helpers import concat_weights, load
from exo.download.shard_download import ShardDownloader
from concurrent.futures import ThreadPoolExecutor
import asyncio

Tensor.no_grad = True

default settings

TEMPERATURE = float(os.getenv("TEMPERATURE", 0.85))
TOP_K = 25
TOP_P = 0.9
ALPHA_F = 0.1
ALPHA_P = 0.0
MODEL_PARAMS = {
"8B": {"args": {"dim": 4096, "n_heads": 32, "n_kv_heads": 8, "n_layers": 32, "norm_eps": 1e-5, "rope_theta": 500000, "vocab_size": 128256, "hidden_dim": 14336}, "files": 1},
"70B": {"args": {"dim": 8192, "n_heads": 64, "n_kv_heads": 8, "n_layers": 80, "norm_eps": 1e-5, "rope_theta": 500000, "vocab_size": 128256, "hidden_dim": 28672}, "files": 8}
}

def build_transformer(model_path: Path, shard: Shard, model_size="8B", devices=None):
# build model
linear = nn.Linear
with Context(THREEFRY=0):
model = Transformer(**MODEL_PARAMS[model_size]["args"], linear=linear, max_context=8192, jit=True, shard=shard)

# load weights and distribute across devices if multiple are available
if model_path.is_dir():
    if (model_path / "model.safetensors.index.json").exists():
        weights = load(str(model_path / "model.safetensors.index.json"), shard)
    elif (model_path / "model.safetensors").exists():
        weights = load(str(model_path / "model.safetensors"), shard)
    else:
        weights = concat_weights(
            [load(str(model_path / f"consolidated.{i:02d}.pth"), shard) for i in range(MODEL_PARAMS[model_size]["files"])],
            devices[0] if isinstance(devices, list) and devices else None
        )
else:
    weights = load(str(model_path), shard)

weights = convert_from_huggingface(weights, model, MODEL_PARAMS[model_size]["args"]["n_heads"], MODEL_PARAMS[model_size]["args"]["n_kv_heads"])
weights = fix_bf16(weights)

for i, device in enumerate(devices or [None]):
    with Context(device=device):
        load_state_dict(model, weights, strict=False, consume=False)  # consume=True if needed

return model

class TinygradDynamicShardInferenceEngine(InferenceEngine):
def init(self, shard_downloader: ShardDownloader, devices=None):
self.shard = None
self.shard_downloader = shard_downloader
self.executor = ThreadPoolExecutor(max_workers=1)
self.devices = devices or [0] # Default to GPU 0 if no devices are provided

async def infer_prompt(self, request_id: str, shard: Shard, prompt: str, image_str: Optional[str] = None, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    await self.ensure_shard(shard)
    start_pos = json.loads(inference_state or "{}").get("start_pos", 0)
    n_captured_toks = json.loads(inference_state or "{}").get("n_captured_toks", 0)

    toks = await asyncio.get_event_loop().run_in_executor(self.executor, self.tokenizer.encode, prompt)
    h_results = []

    for device in self.devices:
        with Context(device=device):
            h = await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.model(Tensor([toks]), start_pos, TEMPERATURE).realize())
            h_results.append(h)

    h = self.aggregate_results(h_results)

    if h.shape == (1,):
        start_pos += len(toks) + 1
        n_captured_toks = 0
        return np.array([[h.item()]]), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), h.item() == self.tokenizer.eos_token_id
    else:
        n_captured_toks = len(toks)
        return h.numpy(), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), False

async def infer_tensor(self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    await self.ensure_shard(shard)
    start_pos = json.loads(inference_state or "{}").get("start_pos", 0)
    n_captured_toks = json.loads(inference_state or "{}").get("n_captured_toks", 0)

    h_results = []
    for device in self.devices:
        with Context(device=device):
            h = await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.model(Tensor(input_data), start_pos, TEMPERATURE).realize())
            h_results.append(h)

    h = self.aggregate_results(h_results)

    if h.shape == (1,):
        start_pos += n_captured_toks + 1
        n_captured_toks = 0
        return np.array([[h.item()]]), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), h.item() == self.tokenizer.eos_token_id
    else:
        return h.numpy(), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), False

async def ensure_shard(self, shard: Shard):
    if self.shard == shard:
        return

    model_path = await self.shard_downloader.ensure_shard(shard)

    if self.shard != shard:
        self.model = await asyncio.get_event_loop().run_in_executor(
            self.executor, build_transformer, model_path, shard, "8B" if "8b" in shard.model_id.lower() else "70B", self.devices
        )

        tokenizer_path = str((model_path if model_path.is_dir() else model_path.parent))
        self.tokenizer = await resolve_tokenizer(tokenizer_path)
        self.shard = shard

def aggregate_results(self, results):
    # This function can be adjusted based on how aggregation should be handled. Currently, it returns the first result.
    return results[0]

@udupicloud
Copy link
Author

udupicloud commented Nov 13, 2024

@jorge123255 - Thanks for the update. I tried and couldn't get it going as I am beginner in this. If I can get the file, it would be of great help.
Basically I need exo to fetch all the GPU's it has on those nodes. It can be 1 or n number of GPU's. I am building this for large models

@jorge123255
Copy link

@udupicloud here i uploaded the changes on my github
https://github.com/jorge123255/exo.git

@svm87601
Copy link

After modifying the code according to your method, I encountered the following error:
“Error: Failed to fetch completions: Error processing prompt (see logs with DEBUG>=2): HFShardDownloader.ensure_shard() missing 1 required positional argument: 'inference_engine_name'."

@jorge123255
Copy link

Aaa let me fix that

@svm87601
Copy link

Is it solved now?

@jorge123255
Copy link

jorge123255 commented Nov 25, 2024 via email

@svm87601
Copy link

I'm sorry to hear this and believe that everything will be fine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants