Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cacheflow/http_frontend/fastapi_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
process_server_arguments,
initialize_cluster)
from cacheflow.worker.controller import DeviceID
from cacheflow.utils import Counter, get_gpu_memory, get_cpu_memory
from cacheflow.utils import Counter

TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds
app = FastAPI()
Expand All @@ -34,6 +34,7 @@ def __init__(
dtype: str,
seed: int,
swap_space: int,
cache_block_memory_utilization: float,
Comment thread
zhuohan123 marked this conversation as resolved.
Outdated
max_num_batched_tokens: int,
max_num_sequences: int,
num_nodes: int,
Expand Down Expand Up @@ -62,14 +63,13 @@ def __init__(
dtype=dtype,
seed=seed,
swap_space=swap_space,
cache_block_memory_utilization=cache_block_memory_utilization,
max_num_batched_tokens=max_num_batched_tokens,
max_num_sequences=max_num_sequences,
num_nodes=num_nodes,
num_devices_per_node=num_devices_per_node,
distributed_init_method=distributed_init_method,
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=server_use_ray,
)

Expand Down Expand Up @@ -182,6 +182,7 @@ async def generate_stream(request: Request):
dtype=args.dtype,
seed=args.seed,
swap_space=args.swap_space,
cache_block_memory_utilization=args.cache_block_memory_utilization,
max_num_batched_tokens=args.max_num_batched_tokens,
max_num_sequences=args.max_num_sequences,
num_nodes=num_nodes,
Expand Down
47 changes: 23 additions & 24 deletions cacheflow/master/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Tuple, Optional
import random

import numpy as np
import torch
try:
import ray
Expand All @@ -10,11 +11,9 @@

from cacheflow.master.scheduler import Scheduler
from cacheflow.master.simple_frontend import SimpleFrontend
from cacheflow.models import get_memory_analyzer
from cacheflow.worker.controller import Controller, DeviceID
from cacheflow.sequence import SequenceGroup
from cacheflow.sampling_params import SamplingParams
from cacheflow.utils import get_gpu_memory, get_cpu_memory


class Server:
Expand All @@ -30,14 +29,13 @@ def __init__(
dtype: str,
seed: int,
swap_space: int,
cache_block_memory_utilization: float,
max_num_batched_tokens: int,
max_num_sequences: int,
num_nodes: int,
num_devices_per_node: int,
distributed_init_method: str,
all_stage_devices: List[List[DeviceID]],
gpu_memory: int,
cpu_memory: int,
use_ray: bool,
collect_stats: bool = False,
do_memory_analysis: bool = False,
Expand All @@ -50,21 +48,6 @@ def __init__(
assert self.world_size == 1, (
"Only support single GPU without Ray.")

self.memory_analyzer = get_memory_analyzer(
model_name=model,
block_size=block_size,
dtype=dtype,
gpu_memory=gpu_memory,
cpu_memory=cpu_memory,
tensor_parallel_size=tensor_parallel_size,
)
self.num_gpu_blocks = self.memory_analyzer.get_max_num_gpu_blocks(
max_num_batched_tokens=max_num_batched_tokens)
self.num_cpu_blocks = self.memory_analyzer.get_max_num_cpu_blocks(
swap_space=swap_space)
print(f'# GPU blocks: {self.num_gpu_blocks}, '
f'# CPU blocks: {self.num_cpu_blocks}')

# Create a controller for each pipeline stage.
self.controllers: List[Controller] = []
for i in range(pipeline_parallel_size):
Expand All @@ -76,9 +59,6 @@ def __init__(
tensor_parallel_size=tensor_parallel_size,
distributed_init_method=distributed_init_method,
model_name=model,
block_size=block_size,
Comment thread
zhuohan123 marked this conversation as resolved.
num_gpu_blocks=self.num_gpu_blocks,
num_cpu_blocks=self.num_cpu_blocks,
dtype=dtype,
seed=seed,
cache_dir=cache_dir,
Expand All @@ -89,6 +69,21 @@ def __init__(
)
self.controllers.append(controller)

# Initialize cache engine.
all_worker_num_available_blocks = []
for controller in self.controllers:
all_worker_num_available_blocks.extend(
controller.get_num_available_blocks(
block_size, swap_space, cache_block_memory_utilization)
)
self.num_gpu_blocks = np.min([b[0] for b in all_worker_num_available_blocks])
self.num_cpu_blocks = np.min([b[1] for b in all_worker_num_available_blocks])
Comment thread
zhuohan123 marked this conversation as resolved.
print(f'# GPU blocks: {self.num_gpu_blocks}, '
f'# CPU blocks: {self.num_cpu_blocks}')
for controller in self.controllers:
controller.init_cache_engine(block_size, self.num_gpu_blocks,
self.num_cpu_blocks)

# Create a scheduler.
self.scheduler = Scheduler(
controllers=self.controllers,
Expand Down Expand Up @@ -204,6 +199,9 @@ def initialize_cluster(
all_stage_devices)


_GiB = 1 << 30


def add_server_arguments(parser: argparse.ArgumentParser):
# Model arguments
parser.add_argument('--model', type=str, default='facebook/opt-125m', help='model name')
Expand All @@ -228,6 +226,7 @@ def add_server_arguments(parser: argparse.ArgumentParser):
# TODO(woosuk): Support fine-grained seeds (e.g., seed per request).
parser.add_argument('--seed', type=int, default=0, help='random seed')
parser.add_argument('--swap-space', type=int, default=20, help='CPU swap space size (GiB) per GPU')
parser.add_argument('--cache-block-memory-utilization', type=float, default=0.9, help='the percentage of free GPU memory to be used for KV cache blocks')
Comment thread
zhuohan123 marked this conversation as resolved.
Outdated
parser.add_argument('--max-num-batched-tokens', type=int, default=2560, help='maximum number of batched tokens per iteration')
parser.add_argument('--max-num-sequences', type=int, default=256, help='maximum number of sequences per iteration')
return parser
Expand All @@ -236,6 +235,7 @@ def add_server_arguments(parser: argparse.ArgumentParser):
def process_server_arguments(args: argparse.Namespace):
if args.pipeline_parallel_size * args.tensor_parallel_size > 1:
args.use_ray = True
args.swap_space = args.swap_space * _GiB
return args


Expand Down Expand Up @@ -263,14 +263,13 @@ def init_local_server_and_frontend_with_arguments(args: argparse.Namespace):
dtype=args.dtype,
seed=args.seed,
swap_space=args.swap_space,
cache_block_memory_utilization=args.cache_block_memory_utilization,
max_num_batched_tokens=args.max_num_batched_tokens,
max_num_sequences=args.max_num_sequences,
num_nodes=num_nodes,
num_devices_per_node=num_devices_per_node,
distributed_init_method=distributed_init_method,
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=args.use_ray,
)

Expand Down
5 changes: 2 additions & 3 deletions cacheflow/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from cacheflow.models.input_metadata import InputMetadata
from cacheflow.models.model_utils import get_memory_analyzer
from cacheflow.models.model_utils import get_cache_block_size
from cacheflow.models.model_utils import get_model


__all__ = [
'InputMetadata',
'get_memory_analyzer',
'get_cache_block_size',
'get_model',
]
50 changes: 29 additions & 21 deletions cacheflow/models/attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@
from cacheflow.models import InputMetadata


_SUPPORTED_HEAD_SIZES = [32, 64, 80, 96, 128, 160, 192, 256]

class GPTCacheFlowAttention(nn.Module):

def __init__(self, scale: float) -> None:
def __init__(self, num_heads: int, head_size: int, scale: float) -> None:
super().__init__()
self.num_heads = num_heads
self.head_size = head_size
self.scale = float(scale)
self.attn_op = xops.fmha.cutlass.FwOp()

if self.head_size not in _SUPPORTED_HEAD_SIZES:
raise ValueError(f'head_size ({self.head_size}) is not supported by '
'the single_query_cached_kv_attention kernel. '
'Use one of the following head sizes: '
f'{_SUPPORTED_HEAD_SIZES}.')

def multi_query_kv_attention(
self,
output: torch.Tensor, # [num_prompt_tokens, num_heads, head_size]
Expand Down Expand Up @@ -47,14 +57,6 @@ def single_query_cached_kv_attention(
value_cache: torch.Tensor, # [num_blocks, num_heads, head_size, block_size]
input_metadata: InputMetadata,
) -> None:
head_size = value_cache.shape[2]
supported_head_sizes = [32, 64, 80, 96, 128, 160, 192, 256]
if head_size not in supported_head_sizes:
raise ValueError(f'head_size ({head_size}) is not supported by '
'the single_query_cached_kv_attention kernel. '
'Use one of the following head sizes: '
f'{supported_head_sizes}.')

block_size = value_cache.shape[3]
attention_ops.single_query_cached_kv_attention(
output,
Expand All @@ -73,20 +75,18 @@ def forward(
query: torch.Tensor, # [num_tokens, num_heads * head_size]
key: torch.Tensor, # [num_tokens, num_heads * head_size]
value: torch.Tensor, # [num_tokens, num_heads * head_size]
key_cache: torch.Tensor, # [num_blocks, num_heads, head_size/x, block_size, x]
value_cache: torch.Tensor, # [num_blocks, num_heads, head_size, block_size]
key_cache: Optional[torch.Tensor], # [num_blocks, num_heads, head_size/x, block_size, x]
value_cache: Optional[torch.Tensor], # [num_blocks, num_heads, head_size, block_size]
input_metadata: InputMetadata,
cache_event: Optional[torch.cuda.Event],
) -> torch.Tensor: # [num_tokens, num_heads * head_size]
# NOTE: The query, key, and value tensors must be sliced from a qkv
# tensor of shape [num_tokens, 3 * num_heads * head_size].

# Reshape the query, key, and value tensors.
num_heads = value_cache.shape[1]
head_size = value_cache.shape[2]
query = query.view(-1, num_heads, head_size)
key = key.view(-1, num_heads, head_size)
value = value.view(-1, num_heads, head_size)
query = query.view(-1, self.num_heads, self.head_size)
key = key.view(-1, self.num_heads, self.head_size)
value = value.view(-1, self.num_heads, self.head_size)

# Pre-allocate the output tensor.
output = torch.empty_like(query)
Expand All @@ -107,8 +107,11 @@ def forward(
cache_event.wait()

# Reshape the keys and values and store them in the cache.
# When key_cache and value_cache are not provided, the new key
# and value vectors will not be cached.
num_valid_tokens = input_metadata.num_valid_tokens
if num_valid_tokens > 0:
if (num_valid_tokens > 0 and key_cache is not None
and value_cache is not None):
# The stride is 3 because the key and value are sliced from qkv.
cache_ops.reshape_and_cache(
key[:num_valid_tokens],
Expand All @@ -119,6 +122,10 @@ def forward(
)

if input_metadata.num_generation_tokens > 0:
assert key_cache is not None and value_cache is not None, (
"key_cache and value_cache must be provided when "
"generating tokens."
)
# Compute the attention op for generation tokens.
self.single_query_cached_kv_attention(
output[num_prompt_tokens:num_valid_tokens],
Expand All @@ -129,20 +136,22 @@ def forward(

# Reshape the output tensor.
# NOTE(woosuk): The output tensor may include paddings.
return output.view(-1, num_heads * head_size)
return output.view(-1, self.num_heads * self.head_size)


class GPTNeoXCacheFlowAttention(GPTCacheFlowAttention):
"""Attention with GPT-NeoX style rotary embedding."""

def __init__(
self,
num_heads: int,
head_size: int,
scale: float,
rotary_dim: int,
max_position: int = 8192,
base: int = 10000,
) -> None:
super().__init__(scale)
super().__init__(num_heads, head_size, scale)

# Create the cos and sin cache.
inv_freq = 1.0 / (base ** (torch.arange(0, rotary_dim, 2) / rotary_dim))
Expand Down Expand Up @@ -172,12 +181,11 @@ def forward(
) -> torch.Tensor: # [num_tokens, num_heads * head_size]
# Apply rotary embedding to the query and key before passing them
# to the attention op.
head_size = value_cache.shape[2]
pos_encoding_ops.rotary_embedding_neox(
positions,
query,
key,
head_size,
self.head_size,
self.cos_sin_cache,
)
return super().forward(
Expand Down
3 changes: 2 additions & 1 deletion cacheflow/models/gpt2.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def __init__(self, config: GPT2Config):
self.c_proj = RowParallelLinear(self.hidden_size, self.hidden_size, bias=True,
input_is_parallel=True,
perform_initialization=False)
self.attn = GPTCacheFlowAttention(scale=self.scale)
self.attn = GPTCacheFlowAttention(self.num_heads, self.head_dim,
scale=self.scale)

def forward(
self,
Expand Down
3 changes: 2 additions & 1 deletion cacheflow/models/gpt_neox.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def __init__(self, config):
scaling = self.head_size ** -0.5
rotary_dim = int(self.head_size * config.rotary_pct)
assert rotary_dim % 2 == 0
self.attn = GPTNeoXCacheFlowAttention(scaling, rotary_dim)
self.attn = GPTNeoXCacheFlowAttention(self.num_heads, self.head_size,
scaling, rotary_dim)

def forward(
self,
Expand Down
3 changes: 2 additions & 1 deletion cacheflow/models/llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def __init__(
input_is_parallel=True,
perform_initialization=False,
)
self.attn = GPTNeoXCacheFlowAttention(self.scaling, self.head_dim)
self.attn = GPTNeoXCacheFlowAttention(self.num_heads, self.head_dim,
self.scaling, self.head_dim)
Comment thread
zhuohan123 marked this conversation as resolved.
Outdated

def forward(
self,
Expand Down
Loading