Skip to content

Commit 7c84bf0

Browse files
committed
feat: add --fixed-schedule-speedup parameter for scaling loadgen
# Conflicts: # aiperf/timing/config.py
1 parent 1b14d24 commit 7c84bf0

File tree

5 files changed

+186
-1
lines changed

5 files changed

+186
-1
lines changed

aiperf/common/config/config_defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class InputDefaults:
4747
FIXED_SCHEDULE_AUTO_OFFSET = False
4848
FIXED_SCHEDULE_START_OFFSET = None
4949
FIXED_SCHEDULE_END_OFFSET = None
50+
FIXED_SCHEDULE_SPEEDUP = None
5051
PUBLIC_DATASET = None
5152
CUSTOM_DATASET_TYPE = None
5253
RANDOM_SEED = None

aiperf/common/config/input_config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,21 @@ def validate_dataset_type(self) -> Self:
178178
),
179179
] = InputDefaults.FIXED_SCHEDULE_END_OFFSET
180180

181+
# NEW AIPerf Option
182+
fixed_schedule_speedup: Annotated[
183+
float | None,
184+
Field(
185+
gt=0,
186+
description="Specifies a scaling factor to apply to the timestamps in the fixed schedule. "
187+
"For example, a value of 2.0 will make the schedule run twice as fast, while a value of 0.5 "
188+
"will make it run half as fast.",
189+
),
190+
CLIParameter(
191+
name=("--fixed-schedule-speedup",),
192+
group=_CLI_GROUP,
193+
),
194+
] = InputDefaults.FIXED_SCHEDULE_SPEEDUP
195+
181196
public_dataset: Annotated[
182197
PublicDatasetType | None,
183198
Field(description="The public dataset to use for the requests."),

aiperf/timing/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class TimingManagerConfig(AIPerfBaseModel):
2525
auto_offset_timestamps: bool = InputDefaults.FIXED_SCHEDULE_AUTO_OFFSET
2626
fixed_schedule_start_offset: int | None = InputDefaults.FIXED_SCHEDULE_START_OFFSET
2727
fixed_schedule_end_offset: int | None = InputDefaults.FIXED_SCHEDULE_END_OFFSET
28+
fixed_schedule_speedup: float | None = InputDefaults.FIXED_SCHEDULE_SPEEDUP
2829
request_cancellation_rate: float = LoadGeneratorDefaults.REQUEST_CANCELLATION_RATE
2930
request_cancellation_delay: float = LoadGeneratorDefaults.REQUEST_CANCELLATION_DELAY
3031

@@ -45,6 +46,7 @@ def from_user_config(cls, user_config: UserConfig) -> "TimingManagerConfig":
4546
auto_offset_timestamps=user_config.input.fixed_schedule_auto_offset,
4647
fixed_schedule_start_offset=user_config.input.fixed_schedule_start_offset,
4748
fixed_schedule_end_offset=user_config.input.fixed_schedule_end_offset,
49+
fixed_schedule_speedup=user_config.input.fixed_schedule_speedup,
4850
request_cancellation_rate=user_config.loadgen.request_cancellation_rate,
4951
request_cancellation_delay=user_config.loadgen.request_cancellation_delay,
5052
)

aiperf/timing/fixed_schedule_strategy.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
self._auto_offset_timestamps = config.auto_offset_timestamps
3636
self._start_offset = config.fixed_schedule_start_offset
3737
self._end_offset = config.fixed_schedule_end_offset
38+
self._time_scale = 1 / (config.fixed_schedule_speedup or 1.0)
3839
super().__init__(config=config, credit_manager=credit_manager)
3940

4041
def _create_timestamp_groups(self) -> None:
@@ -90,8 +91,10 @@ async def _execute_single_phase(self, phase_stats: CreditPhaseStats) -> None:
9091

9192
# Calculate the wait duration for this timestamp
9293
# (timestamp - schedule_zero_ms) is the offset of the conversation(s) from the start of the schedule
94+
# Apply time scaling to the offset, then subtract elapsed time
9395
# (self._perf_counter_ms() - start_time_ms) is how much time has passed since we started dropping credits
94-
wait_duration_ms = (timestamp - self._schedule_zero_ms) - (
96+
scaled_offset_ms = (timestamp - self._schedule_zero_ms) * self._time_scale
97+
wait_duration_ms = scaled_offset_ms - (
9598
self._perf_counter_ms() - start_time_ms
9699
)
97100
wait_duration_sec = wait_duration_ms / MILLIS_PER_SECOND

tests/timing_manager/test_fixed_schedule_strategy.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ def _create_strategy(
3939
schedule: list[tuple[int, str]],
4040
auto_offset: bool = False,
4141
manual_offset: int | None = None,
42+
speedup: float | None = None,
4243
) -> tuple[FixedScheduleStrategy, CreditPhaseStats]:
4344
"""Helper to create a strategy with optional config overrides."""
4445
config = TimingManagerConfig.model_construct(
4546
timing_mode=TimingMode.FIXED_SCHEDULE,
4647
auto_offset_timestamps=auto_offset,
4748
fixed_schedule_start_offset=manual_offset,
49+
fixed_schedule_speedup=speedup,
4850
)
4951
return FixedScheduleStrategy(
5052
config=config,
@@ -194,3 +196,165 @@ async def test_execution_with_auto_offset(
194196
assert phase_stats.sent == 3
195197
expected_zero_ms = first_timestamp_ms if auto_offset else 0
196198
assert strategy._schedule_zero_ms == expected_zero_ms
199+
200+
@pytest.mark.parametrize(
201+
"speedup,expected_time_scale",
202+
[
203+
(None, 1.0), # Default behavior (no speedup)
204+
(1.0, 1.0), # No speedup
205+
(2.0, 0.5), # 2x faster
206+
(0.5, 2.0), # 2x slower
207+
(10.0, 0.1), # 10x faster
208+
(0.1, 10.0), # 10x slower
209+
],
210+
) # fmt: skip
211+
def test_speedup_time_scale_calculation(
212+
self,
213+
simple_schedule: list[tuple[int, str]],
214+
mock_credit_manager: MockCreditManager,
215+
speedup: float | None,
216+
expected_time_scale: float,
217+
):
218+
"""Test that speedup parameter correctly calculates time scale."""
219+
strategy, _ = self._create_strategy(
220+
mock_credit_manager, simple_schedule, speedup=speedup
221+
)
222+
223+
assert strategy._time_scale == expected_time_scale
224+
225+
@pytest.mark.asyncio
226+
@pytest.mark.parametrize(
227+
"speedup,schedule",
228+
[
229+
# 2x faster - should take half the time
230+
(2.0, [(0, "conv1"), (100, "conv2"), (200, "conv3")]),
231+
# 4x faster - should take quarter the time
232+
(4.0, [(0, "conv1"), (100, "conv2"), (200, "conv3")]),
233+
# 2x slower - should take double the time
234+
(0.5, [(0, "conv1"), (100, "conv2"), (200, "conv3")]),
235+
# Different schedule with larger gaps
236+
(2.0, [(0, "conv1"), (500, "conv2"), (1000, "conv3")]),
237+
# Edge case: all at same timestamp should still be instant
238+
(2.0, [(0, "conv1"), (0, "conv2"), (0, "conv3")]),
239+
(0.5, [(0, "conv1"), (0, "conv2"), (0, "conv3")]),
240+
],
241+
) # fmt: skip
242+
async def test_speedup_execution_timing(
243+
self,
244+
mock_credit_manager: MockCreditManager,
245+
time_traveler: TimeTraveler,
246+
speedup: float,
247+
schedule: list[tuple[int, str]],
248+
):
249+
"""Test that speedup parameter affects actual execution timing."""
250+
strategy, phase_stats = self._create_strategy(
251+
mock_credit_manager, schedule, speedup=speedup
252+
)
253+
254+
# Calculate expected duration: (last_timestamp - first_timestamp) / 1000 / speedup
255+
# Since auto_offset is default True, we use the relative duration
256+
first_timestamp_ms = schedule[0][0]
257+
last_timestamp_ms = schedule[-1][0]
258+
base_duration_sec = (last_timestamp_ms - first_timestamp_ms) / MILLIS_PER_SECOND
259+
expected_duration = base_duration_sec / speedup
260+
261+
with time_traveler.sleeps_for(expected_duration):
262+
await strategy._execute_single_phase(phase_stats)
263+
await strategy.wait_for_tasks()
264+
265+
assert phase_stats.sent == len(schedule)
266+
assert len(mock_credit_manager.dropped_credits) == len(schedule)
267+
268+
@pytest.mark.asyncio
269+
async def test_speedup_with_auto_offset(
270+
self,
271+
mock_credit_manager: MockCreditManager,
272+
time_traveler: TimeTraveler,
273+
):
274+
"""Test speedup works correctly with auto offset timestamps."""
275+
# Schedule starts at 1000ms with 200ms total duration
276+
schedule = [(1000, "conv1"), (1100, "conv2"), (1200, "conv3")]
277+
speedup = 2.0 # 2x faster
278+
279+
strategy, phase_stats = self._create_strategy(
280+
mock_credit_manager, schedule, auto_offset=True, speedup=speedup
281+
)
282+
283+
# With auto offset, the duration should be (1200-1000)ms = 200ms
284+
# At 2x speed: 200ms / 2 = 100ms = 0.1s
285+
base_duration_sec = (1200 - 1000) / MILLIS_PER_SECOND # 0.2s
286+
expected_duration = base_duration_sec / speedup # 0.1s
287+
288+
with time_traveler.sleeps_for(expected_duration):
289+
await strategy._execute_single_phase(phase_stats)
290+
await strategy.wait_for_tasks()
291+
292+
assert phase_stats.sent == 3
293+
assert strategy._schedule_zero_ms == 1000 # First timestamp
294+
295+
@pytest.mark.asyncio
296+
async def test_speedup_with_manual_offset(
297+
self,
298+
mock_credit_manager: MockCreditManager,
299+
time_traveler: TimeTraveler,
300+
):
301+
"""Test speedup works correctly with manual offset."""
302+
schedule = [(1000, "conv1"), (1100, "conv2"), (1200, "conv3")]
303+
manual_offset = 500
304+
speedup = 0.5 # 2x slower
305+
306+
strategy, phase_stats = self._create_strategy(
307+
mock_credit_manager, schedule, manual_offset=manual_offset, speedup=speedup
308+
)
309+
310+
# With manual offset of 500, effective duration is (1200-500) = 700ms
311+
# At 0.5x speed: 700ms / 0.5 = 1400ms = 1.4s
312+
base_duration_sec = (1200 - manual_offset) / MILLIS_PER_SECOND # 0.7s
313+
expected_duration = base_duration_sec / speedup # 1.4s
314+
315+
with time_traveler.sleeps_for(expected_duration):
316+
await strategy._execute_single_phase(phase_stats)
317+
await strategy.wait_for_tasks()
318+
319+
assert phase_stats.sent == 3
320+
assert strategy._schedule_zero_ms == manual_offset
321+
322+
@pytest.mark.asyncio
323+
async def test_speedup_with_negative_timestamps(
324+
self,
325+
mock_credit_manager: MockCreditManager,
326+
time_traveler: TimeTraveler,
327+
):
328+
"""Test speedup behavior with negative timestamps (past events)."""
329+
# All timestamps are in the past, should execute immediately
330+
schedule = [(-100, "conv1"), (-50, "conv2"), (0, "conv3")]
331+
speedup = 2.0 # Even with speedup, past events should be immediate
332+
333+
strategy, phase_stats = self._create_strategy(
334+
mock_credit_manager, schedule, speedup=speedup
335+
)
336+
337+
# Should still take no time since all timestamps are <= 0
338+
with time_traveler.sleeps_for(0.0):
339+
await strategy._execute_single_phase(phase_stats)
340+
await strategy.wait_for_tasks()
341+
342+
assert phase_stats.sent == 3
343+
344+
def test_speedup_edge_cases(
345+
self,
346+
simple_schedule: list[tuple[int, str]],
347+
mock_credit_manager: MockCreditManager,
348+
):
349+
"""Test edge cases for speedup parameter."""
350+
# Test very small speedup (very slow execution)
351+
strategy_slow, _ = self._create_strategy(
352+
mock_credit_manager, simple_schedule, speedup=0.001
353+
)
354+
assert strategy_slow._time_scale == 1000.0
355+
356+
# Test very large speedup (very fast execution)
357+
strategy_fast, _ = self._create_strategy(
358+
mock_credit_manager, simple_schedule, speedup=1000.0
359+
)
360+
assert strategy_fast._time_scale == 0.001

0 commit comments

Comments
 (0)