Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 92 additions & 54 deletions tests/diffusers_map.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import itertools

import pytest
from diffusers.configuration_utils import ConfigMixin
from diffusers.schedulers.scheduling_ddim import DDIMScheduler
from diffusers.schedulers.scheduling_ddpm import DDPMScheduler
Expand All @@ -11,110 +14,145 @@
from testing_common import FLOW_CONFIG, SCALED_CONFIG

from skrample.diffusers import SkrampleWrapperScheduler
from skrample.sampling.models import FlowModel, NoiseModel, VelocityModel
from skrample.sampling.models import DiffusionModel, FlowModel, NoiseModel, VelocityModel
from skrample.sampling.structured import DPM, Adams, Euler, UniPC
from skrample.scheduling import Beta, Exponential, FlowShift, Karras, Linear, Scaled
from skrample.scheduling import Beta, Exponential, FlowShift, Karras, Linear, Scaled, ScheduleModifier

EPSILON = NoiseModel()
FLOW = FlowModel()
VELOCITY = VelocityModel()


def check_wrapper(wrapper: SkrampleWrapperScheduler, scheduler: ConfigMixin, params: list[str] = []) -> None:
def assert_wrapper(wrapper: SkrampleWrapperScheduler, scheduler: ConfigMixin) -> None:
a, b = wrapper, SkrampleWrapperScheduler.from_diffusers_config(scheduler)
a.fake_config = b.fake_config
assert a == b, " | ".join([type(scheduler).__name__] + [str(p) for p in params])


def test_dpm() -> None:
for flag, mod in [
("lower_order_final", None), # dummy flag always true
("use_karras_sigmas", Karras),
("use_exponential_sigmas", Exponential),
("use_beta_sigmas", Beta),
]:
for algo, noise in [
("dpmsolver", False),
("dpmsolver++", False),
("sde-dpmsolver", True),
("sde-dpmsolver++", True),
]:
for uniform, spacing in [(False, "leading"), (True, "trailing")]:
for skpred, dfpred in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]:
for order in range(1, 4):
check_wrapper(
SkrampleWrapperScheduler(
DPM(add_noise=noise, order=order),
mod(Scaled(uniform=uniform)) if mod else Scaled(uniform=uniform),
skpred,
),
DPMSolverMultistepScheduler.from_config(
SCALED_CONFIG
| {
"prediction_type": dfpred,
"solver_order": order,
"timestep_spacing": spacing,
"algorithm_type": algo,
"final_sigmas_type": "sigma_min", # for non ++ to not err
flag: True,
}
),
[flag, algo, spacing, dfpred, f"o{order}"],
)

check_wrapper(
assert a == b


@pytest.mark.parametrize(
(
"modifiers",
"add_noise",
"schedule_uniform",
"model_transform",
"order",
),
itertools.product(
[
("lower_order_final", None), # dummy flag always true
("use_karras_sigmas", Karras),
("use_exponential_sigmas", Exponential),
("use_beta_sigmas", Beta),
],
[("dpmsolver", False), ("dpmsolver++", False), ("sde-dpmsolver", True), ("sde-dpmsolver++", True)],
[("leading", False), ("trailing", True)],
[("epsilon", EPSILON), ("v_prediction", VELOCITY)],
range(1, 4),
),
)
def test_dpm(
modifiers: tuple[str, type[ScheduleModifier] | None],
add_noise: tuple[str, bool],
schedule_uniform: tuple[str, bool],
model_transform: tuple[str, DiffusionModel],
order: int,
) -> None:
flag, mod = modifiers
algo, noise = add_noise
spacing, uniform = schedule_uniform
dfpred, skpred = model_transform
assert_wrapper(
SkrampleWrapperScheduler(
DPM(add_noise=noise, order=order),
mod(Scaled(uniform=uniform)) if mod else Scaled(uniform=uniform),
skpred,
),
DPMSolverMultistepScheduler.from_config(
SCALED_CONFIG
| {
"prediction_type": dfpred,
"solver_order": order,
"timestep_spacing": spacing,
"algorithm_type": algo,
"final_sigmas_type": "sigma_min", # for non ++ to not err
flag: True,
}
),
)


def test_dpm_flow() -> None:
assert_wrapper(
SkrampleWrapperScheduler(DPM(order=2), FlowShift(Linear()), FLOW),
DPMSolverMultistepScheduler.from_config(FLOW_CONFIG),
)


def test_euler() -> None:
check_wrapper(
assert_wrapper(
SkrampleWrapperScheduler(Euler(), Scaled(uniform=False)),
EulerDiscreteScheduler.from_config(SCALED_CONFIG),
)
check_wrapper(


def test_euler_a() -> None:
assert_wrapper(
SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)),
EulerAncestralDiscreteScheduler.from_config(SCALED_CONFIG),
)
check_wrapper(


def test_euler_flow() -> None:
assert_wrapper(
SkrampleWrapperScheduler(Euler(), FlowShift(Linear()), FLOW),
FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG),
)
check_wrapper(


def test_euler_beta() -> None:
assert_wrapper(
SkrampleWrapperScheduler(Euler(), Beta(FlowShift(Linear())), FLOW),
FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG | {"use_beta_sigmas": True}),
)


def test_ipndm() -> None:
check_wrapper(
assert_wrapper(
SkrampleWrapperScheduler(Adams(order=4), Scaled(uniform=False)),
IPNDMScheduler.from_config(SCALED_CONFIG),
)


def test_unipc() -> None:
check_wrapper(
assert_wrapper(
SkrampleWrapperScheduler(UniPC(order=2), Scaled(uniform=False)),
UniPCMultistepScheduler.from_config(SCALED_CONFIG),
)
check_wrapper(


def test_unipc_flow() -> None:
assert_wrapper(
SkrampleWrapperScheduler(UniPC(order=2), FlowShift(Linear()), FLOW),
UniPCMultistepScheduler.from_config(FLOW_CONFIG),
)


def test_alias() -> None:
check_wrapper(
def test_dpmsde() -> None:
assert_wrapper(
SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)),
DPMSolverSDEScheduler.from_config(SCALED_CONFIG),
)
check_wrapper(


def test_ddim() -> None:
assert_wrapper(
SkrampleWrapperScheduler(Euler(), Scaled(uniform=False)),
DDIMScheduler.from_config(SCALED_CONFIG),
)
check_wrapper(


def test_ddpm() -> None:
assert_wrapper(
SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)),
DDPMScheduler.from_config(SCALED_CONFIG),
)
141 changes: 79 additions & 62 deletions tests/diffusers_samplers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,94 +110,111 @@ def dual_sample(
return a_sample, b_sample


def compare_samplers(
a: StructuredSampler,
b: DiffusersScheduler,
t: DiffusionModel = EPSILON,
mu: float | None = None,
margin: float = 1e-8,
message: str = "",
) -> None:
for step_range in [range(0, 2), range(0, 11), range(0, 201), range(3, 6), range(2, 23), range(31, 200)]:
compare_tensors(
*dual_sample(a, b, t, step_range, mu),
message=str(step_range) + (" | " + message if message else ""),
margin=margin,
)
STEP_RANGES = [range(0, 2), range(0, 11), range(0, 201), range(3, 6), range(2, 23), range(31, 200)]


def test_euler() -> None:
for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]:
compare_samplers(
@pytest.mark.parametrize(
("predictor", "steps"),
itertools.product([(EPSILON, "epsilon"), (VELOCITY, "v_prediction")], STEP_RANGES),
)
def test_euler(predictor: tuple[DiffusionModel, str], steps: range) -> None:
compare_tensors(
*dual_sample(
Euler(),
EulerDiscreteScheduler.from_config(
SCALED_CONFIG,
prediction_type=predictor[1],
),
predictor[0],
message=type(predictor[0]).__name__,
steps,
)
)


def test_euler_ancestral() -> None:
for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]:
compare_samplers(
@pytest.mark.parametrize(
("predictor", "steps"),
itertools.product([(EPSILON, "epsilon"), (VELOCITY, "v_prediction")], STEP_RANGES),
)
def test_euler_ancestral(predictor: tuple[DiffusionModel, str], steps: range) -> None:
compare_tensors(
*dual_sample(
DPM(add_noise=True),
EulerAncestralDiscreteScheduler.from_config(
SCALED_CONFIG,
prediction_type=predictor[1],
),
predictor[0],
message=type(predictor[0]).__name__,
steps,
)
)


@pytest.mark.parametrize("steps", STEP_RANGES)
def test_euler_flow(steps: range) -> None:
compare_tensors(
*dual_sample(
Euler(),
FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG),
FLOW,
steps,
mu=0.7,
)
)


def test_euler_flow() -> None:
compare_samplers(
Euler(),
FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG),
FLOW,
mu=0.7,
@pytest.mark.parametrize(
("predictor", "order", "stochastic", "steps"),
itertools.product(
[(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")],
range(1, 3), # Their third order is fucked up. Turns into barf @ super high steps
(False, True),
STEP_RANGES,
),
)
def test_dpm(predictor: tuple[DiffusionModel, str], order: int, stochastic: bool, steps: range) -> None:
compare_tensors(
*dual_sample(
DPM(order=order, add_noise=stochastic),
DPMSolverMultistepScheduler.from_config(
SCALED_CONFIG,
algorithm_type="sde-dpmsolver++" if stochastic else "dpmsolver++",
final_sigmas_type="zero",
solver_order=order,
prediction_type=predictor[1],
use_flow_sigmas=predictor[0] == FLOW,
),
predictor[0],
steps,
)
)


def test_dpm() -> None:
for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")]:
for order in range(1, 3): # Their third order is fucked up. Turns into barf @ super high steps
for stochastic in [False, True]:
compare_samplers(
DPM(order=order, add_noise=stochastic),
DPMSolverMultistepScheduler.from_config(
SCALED_CONFIG,
algorithm_type="sde-dpmsolver++" if stochastic else "dpmsolver++",
final_sigmas_type="zero",
solver_order=order,
prediction_type=predictor[1],
use_flow_sigmas=predictor[0] == FLOW,
),
predictor[0],
message=f"{type(predictor[0]).__name__} o{order} s{stochastic}",
)


def test_unipc() -> None:
for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")]:
@pytest.mark.parametrize(
("predictor", "order", "steps"),
itertools.product(
[(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")],
# technically it can do N order, but diffusers actually breaks down super hard with high order + steps
# They use torch scalars for everything which accumulates error faster as steps and order increase
# Considering Diffusers just NaNs out in like half the order as mine, I'm fine with fudging the margins
for order in range(1, 5):
compare_samplers(
UniPC(order=order, fast_solve=True),
UniPCMultistepScheduler.from_config(
SCALED_CONFIG,
final_sigmas_type="zero",
solver_order=order,
prediction_type=predictor[1],
use_flow_sigmas=predictor[0] == FLOW,
),
predictor[0],
message=f"{type(predictor[0]).__name__} o{order}",
)
range(1, 5),
STEP_RANGES,
),
)
def test_unipc(predictor: tuple[DiffusionModel, str], order: int, steps: range) -> None:
compare_tensors(
*dual_sample(
UniPC(order=order, fast_solve=True),
UniPCMultistepScheduler.from_config(
SCALED_CONFIG,
final_sigmas_type="zero",
solver_order=order,
prediction_type=predictor[1],
use_flow_sigmas=predictor[0] == FLOW,
),
predictor[0],
steps,
)
)


@pytest.mark.parametrize(
Expand Down
Loading
Loading