Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions tests/v1/metrics/test_engine_logger_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import copy

import pytest

from vllm.v1.engine.async_llm import AsyncEngineArgs, AsyncLLM
from vllm.v1.metrics.ray_wrappers import RayPrometheusStatLogger


@pytest.fixture
def log_stats_enabled_engine_args():
"""
Shared fixture providing common AsyncEngineArgs configuration
used across multiple tests.
"""
return AsyncEngineArgs(
model="distilbert/distilgpt2",
dtype="half",
disable_log_stats=False,
enforce_eager=True,
)


@pytest.fixture
def default_dp_shared_loggers_len(log_stats_enabled_engine_args):
"""
Fixture to provide the length of the default dp_shared_loggers
for AsyncLLM with no custom stat loggers.
"""
engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args,
stat_loggers=[])
length = len(engine.logger_manager.dp_shared_loggers)
engine.shutdown()
return length


@pytest.mark.asyncio
async def test_async_llm_replace_default_loggers(
log_stats_enabled_engine_args, default_dp_shared_loggers_len):
"""
The default stats loggers should be used regardless of whether additional
custom ones are added.
"""

engine = AsyncLLM.from_engine_args(log_stats_enabled_engine_args,
stat_loggers=[RayPrometheusStatLogger])
assert len(engine.logger_manager.dp_shared_loggers
) == default_dp_shared_loggers_len + 1
engine.shutdown()


@pytest.mark.asyncio
async def test_async_llm_add_to_default_loggers(log_stats_enabled_engine_args):
"""
It's still possible to use custom stat loggers exclusively by passing
disable_log_stats=True in addition to a list of custom stat loggers.
"""
# Create engine_args with disable_log_stats=True for this test
disabled_log_engine_args = copy.deepcopy(log_stats_enabled_engine_args)
disabled_log_engine_args.disable_log_stats = True

# Disable default loggers whilst passing a custom stat logger
engine = AsyncLLM.from_engine_args(disabled_log_engine_args,
stat_loggers=[RayPrometheusStatLogger])

# Only RayPrometheusStatLogger is available
assert len(engine.logger_manager.dp_shared_loggers) == 1
assert isinstance(engine.logger_manager.dp_shared_loggers[0],
RayPrometheusStatLogger)

# log_stats is still True, since custom stat loggers are used
assert engine.log_stats

engine.shutdown()
8 changes: 7 additions & 1 deletion vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@
self.model_config = vllm_config.model_config
self.vllm_config = vllm_config
self.log_requests = log_requests
self.log_stats = log_stats

self.log_stats = log_stats or (stat_loggers is not None)
if not log_stats and stats_loggers is not None:
logger.info(
"AsyncLLM created with log_stats=False and non-empty custom "
"logger list; enabling logging without default stat loggers")

if self.model_config.skip_tokenizer_init:

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Name "stats_loggers" is not defined [name-defined]

Check failure on line 103 in vllm/v1/engine/async_llm.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

vllm/v1/engine/async_llm.py:103:30: F821 Undefined name `stats_loggers`
self.tokenizer = None
else:
# Tokenizer (+ ensure liveness if running in another process).
Expand Down Expand Up @@ -132,6 +137,7 @@
vllm_config=vllm_config,
engine_idxs=self.engine_core.engine_ranks_managed,
custom_stat_loggers=stat_loggers,
enable_default_loggers=log_stats,
)
self.logger_manager.log_engine_initialized()

Expand Down
47 changes: 26 additions & 21 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,37 +635,41 @@ def __init__(
vllm_config: VllmConfig,
engine_idxs: Optional[list[int]] = None,
custom_stat_loggers: Optional[list[StatLoggerFactory]] = None,
enable_default_loggers: bool = True,
):
self.engine_idxs = engine_idxs if engine_idxs else [0]

factories: list[StatLoggerFactory]
factories: list[StatLoggerFactory] = []
if custom_stat_loggers is not None:
factories = custom_stat_loggers
else:
factories = []
if logger.isEnabledFor(logging.INFO):
factories.append(LoggingStatLogger)
factories.extend(custom_stat_loggers)

if enable_default_loggers and logger.isEnabledFor(logging.INFO):
factories.append(LoggingStatLogger)

# For Prometheus, need to share the metrics between EngineCores.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of all the prometheus logger changes made below, I'd suggest to keep the self.prometheus_logger field just make it Optional. Use PrometheusStatLogger if a factory subclass isn't found in the custom stats loggers and enable_default_loggers is True.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha gotcha. I think in that case we wouldn't have the goal API behavior (extend default stat loggers with custom ones) though right? E.g. PrometheusStatLogger gets overwritten by RayPrometheusStatLogger?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be a special case ... if a subclass of PrometheusStatLogger is passed in then it would replace the built-in one. Really this is so it can be overridden in the ray case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point taken. I think it makes sense to keep the change minimal/narrow for now and defer a dp_shared_loggers concept until it's clearly needed; keeping the single prometheus_logger field.

# Each EngineCore's metrics are expressed as a unique label.
self.dp_shared_loggers = []
if enable_default_loggers:
self.dp_shared_loggers.append(
PrometheusStatLogger(vllm_config, engine_idxs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to keep this as a single prometheus_logger field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above self.dp_shared_loggers code can also be removed now? It's not used right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks.


# engine_idx: StatLogger
self.per_engine_logger_dict: dict[int, list[StatLoggerBase]] = {}
prometheus_factory = PrometheusStatLogger
for engine_idx in self.engine_idxs:
loggers: list[StatLoggerBase] = []
for logger_factory in factories:
# If we get a custom prometheus logger, use that
# instead. This is typically used for the ray case.
# If we get a custom prometheus logger, add that to the shared
# DP logger list. This is typically used for the ray case.
if (isinstance(logger_factory, type)
and issubclass(logger_factory, PrometheusStatLogger)):
prometheus_factory = logger_factory
continue
loggers.append(logger_factory(vllm_config,
engine_idx)) # type: ignore
self.dp_shared_loggers.append(
logger_factory(vllm_config,
engine_idxs)) # type: ignore
else:
loggers.append(logger_factory(vllm_config,
engine_idx)) # type: ignore
self.per_engine_logger_dict[engine_idx] = loggers

# For Prometheus, need to share the metrics between EngineCores.
# Each EngineCore's metrics are expressed as a unique label.
self.prometheus_logger = prometheus_factory(vllm_config, engine_idxs)

def record(
self,
scheduler_stats: Optional[SchedulerStats],
Expand All @@ -679,17 +683,18 @@ def record(
for logger in per_engine_loggers:
logger.record(scheduler_stats, iteration_stats, engine_idx)

self.prometheus_logger.record(scheduler_stats, iteration_stats,
engine_idx)
for logger in self.dp_shared_loggers:
logger.record(scheduler_stats, iteration_stats, engine_idx)

def log(self):
for per_engine_loggers in self.per_engine_logger_dict.values():
for logger in per_engine_loggers:
logger.log()

def log_engine_initialized(self):
self.prometheus_logger.log_engine_initialized()

for per_engine_loggers in self.per_engine_logger_dict.values():
for logger in per_engine_loggers:
logger.log_engine_initialized()

for logger in self.dp_shared_loggers:
logger.log_engine_initialized()