diff --git a/tests/diffusers_map.py b/tests/diffusers_map.py index f4eb279..4cadfe2 100644 --- a/tests/diffusers_map.py +++ b/tests/diffusers_map.py @@ -1,3 +1,6 @@ +import itertools + +import pytest from diffusers.configuration_utils import ConfigMixin from diffusers.schedulers.scheduling_ddim import DDIMScheduler from diffusers.schedulers.scheduling_ddpm import DDPMScheduler @@ -11,110 +14,145 @@ from testing_common import FLOW_CONFIG, SCALED_CONFIG from skrample.diffusers import SkrampleWrapperScheduler -from skrample.sampling.models import FlowModel, NoiseModel, VelocityModel +from skrample.sampling.models import DiffusionModel, FlowModel, NoiseModel, VelocityModel from skrample.sampling.structured import DPM, Adams, Euler, UniPC -from skrample.scheduling import Beta, Exponential, FlowShift, Karras, Linear, Scaled +from skrample.scheduling import Beta, Exponential, FlowShift, Karras, Linear, Scaled, ScheduleModifier EPSILON = NoiseModel() FLOW = FlowModel() VELOCITY = VelocityModel() -def check_wrapper(wrapper: SkrampleWrapperScheduler, scheduler: ConfigMixin, params: list[str] = []) -> None: +def assert_wrapper(wrapper: SkrampleWrapperScheduler, scheduler: ConfigMixin) -> None: a, b = wrapper, SkrampleWrapperScheduler.from_diffusers_config(scheduler) a.fake_config = b.fake_config - assert a == b, " | ".join([type(scheduler).__name__] + [str(p) for p in params]) - - -def test_dpm() -> None: - for flag, mod in [ - ("lower_order_final", None), # dummy flag always true - ("use_karras_sigmas", Karras), - ("use_exponential_sigmas", Exponential), - ("use_beta_sigmas", Beta), - ]: - for algo, noise in [ - ("dpmsolver", False), - ("dpmsolver++", False), - ("sde-dpmsolver", True), - ("sde-dpmsolver++", True), - ]: - for uniform, spacing in [(False, "leading"), (True, "trailing")]: - for skpred, dfpred in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]: - for order in range(1, 4): - check_wrapper( - SkrampleWrapperScheduler( - DPM(add_noise=noise, order=order), - mod(Scaled(uniform=uniform)) if mod else Scaled(uniform=uniform), - skpred, - ), - DPMSolverMultistepScheduler.from_config( - SCALED_CONFIG - | { - "prediction_type": dfpred, - "solver_order": order, - "timestep_spacing": spacing, - "algorithm_type": algo, - "final_sigmas_type": "sigma_min", # for non ++ to not err - flag: True, - } - ), - [flag, algo, spacing, dfpred, f"o{order}"], - ) - - check_wrapper( + assert a == b + + +@pytest.mark.parametrize( + ( + "modifiers", + "add_noise", + "schedule_uniform", + "model_transform", + "order", + ), + itertools.product( + [ + ("lower_order_final", None), # dummy flag always true + ("use_karras_sigmas", Karras), + ("use_exponential_sigmas", Exponential), + ("use_beta_sigmas", Beta), + ], + [("dpmsolver", False), ("dpmsolver++", False), ("sde-dpmsolver", True), ("sde-dpmsolver++", True)], + [("leading", False), ("trailing", True)], + [("epsilon", EPSILON), ("v_prediction", VELOCITY)], + range(1, 4), + ), +) +def test_dpm( + modifiers: tuple[str, type[ScheduleModifier] | None], + add_noise: tuple[str, bool], + schedule_uniform: tuple[str, bool], + model_transform: tuple[str, DiffusionModel], + order: int, +) -> None: + flag, mod = modifiers + algo, noise = add_noise + spacing, uniform = schedule_uniform + dfpred, skpred = model_transform + assert_wrapper( + SkrampleWrapperScheduler( + DPM(add_noise=noise, order=order), + mod(Scaled(uniform=uniform)) if mod else Scaled(uniform=uniform), + skpred, + ), + DPMSolverMultistepScheduler.from_config( + SCALED_CONFIG + | { + "prediction_type": dfpred, + "solver_order": order, + "timestep_spacing": spacing, + "algorithm_type": algo, + "final_sigmas_type": "sigma_min", # for non ++ to not err + flag: True, + } + ), + ) + + +def test_dpm_flow() -> None: + assert_wrapper( SkrampleWrapperScheduler(DPM(order=2), FlowShift(Linear()), FLOW), DPMSolverMultistepScheduler.from_config(FLOW_CONFIG), ) def test_euler() -> None: - check_wrapper( + assert_wrapper( SkrampleWrapperScheduler(Euler(), Scaled(uniform=False)), EulerDiscreteScheduler.from_config(SCALED_CONFIG), ) - check_wrapper( + + +def test_euler_a() -> None: + assert_wrapper( SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)), EulerAncestralDiscreteScheduler.from_config(SCALED_CONFIG), ) - check_wrapper( + + +def test_euler_flow() -> None: + assert_wrapper( SkrampleWrapperScheduler(Euler(), FlowShift(Linear()), FLOW), FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG), ) - check_wrapper( + + +def test_euler_beta() -> None: + assert_wrapper( SkrampleWrapperScheduler(Euler(), Beta(FlowShift(Linear())), FLOW), FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG | {"use_beta_sigmas": True}), ) def test_ipndm() -> None: - check_wrapper( + assert_wrapper( SkrampleWrapperScheduler(Adams(order=4), Scaled(uniform=False)), IPNDMScheduler.from_config(SCALED_CONFIG), ) def test_unipc() -> None: - check_wrapper( + assert_wrapper( SkrampleWrapperScheduler(UniPC(order=2), Scaled(uniform=False)), UniPCMultistepScheduler.from_config(SCALED_CONFIG), ) - check_wrapper( + + +def test_unipc_flow() -> None: + assert_wrapper( SkrampleWrapperScheduler(UniPC(order=2), FlowShift(Linear()), FLOW), UniPCMultistepScheduler.from_config(FLOW_CONFIG), ) -def test_alias() -> None: - check_wrapper( +def test_dpmsde() -> None: + assert_wrapper( SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)), DPMSolverSDEScheduler.from_config(SCALED_CONFIG), ) - check_wrapper( + + +def test_ddim() -> None: + assert_wrapper( SkrampleWrapperScheduler(Euler(), Scaled(uniform=False)), DDIMScheduler.from_config(SCALED_CONFIG), ) - check_wrapper( + + +def test_ddpm() -> None: + assert_wrapper( SkrampleWrapperScheduler(DPM(add_noise=True), Scaled(uniform=False)), DDPMScheduler.from_config(SCALED_CONFIG), ) diff --git a/tests/diffusers_samplers.py b/tests/diffusers_samplers.py index 0621522..2a7ce47 100644 --- a/tests/diffusers_samplers.py +++ b/tests/diffusers_samplers.py @@ -110,94 +110,111 @@ def dual_sample( return a_sample, b_sample -def compare_samplers( - a: StructuredSampler, - b: DiffusersScheduler, - t: DiffusionModel = EPSILON, - mu: float | None = None, - margin: float = 1e-8, - message: str = "", -) -> None: - for step_range in [range(0, 2), range(0, 11), range(0, 201), range(3, 6), range(2, 23), range(31, 200)]: - compare_tensors( - *dual_sample(a, b, t, step_range, mu), - message=str(step_range) + (" | " + message if message else ""), - margin=margin, - ) +STEP_RANGES = [range(0, 2), range(0, 11), range(0, 201), range(3, 6), range(2, 23), range(31, 200)] -def test_euler() -> None: - for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]: - compare_samplers( +@pytest.mark.parametrize( + ("predictor", "steps"), + itertools.product([(EPSILON, "epsilon"), (VELOCITY, "v_prediction")], STEP_RANGES), +) +def test_euler(predictor: tuple[DiffusionModel, str], steps: range) -> None: + compare_tensors( + *dual_sample( Euler(), EulerDiscreteScheduler.from_config( SCALED_CONFIG, prediction_type=predictor[1], ), predictor[0], - message=type(predictor[0]).__name__, + steps, ) + ) -def test_euler_ancestral() -> None: - for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction")]: - compare_samplers( +@pytest.mark.parametrize( + ("predictor", "steps"), + itertools.product([(EPSILON, "epsilon"), (VELOCITY, "v_prediction")], STEP_RANGES), +) +def test_euler_ancestral(predictor: tuple[DiffusionModel, str], steps: range) -> None: + compare_tensors( + *dual_sample( DPM(add_noise=True), EulerAncestralDiscreteScheduler.from_config( SCALED_CONFIG, prediction_type=predictor[1], ), predictor[0], - message=type(predictor[0]).__name__, + steps, + ) + ) + + +@pytest.mark.parametrize("steps", STEP_RANGES) +def test_euler_flow(steps: range) -> None: + compare_tensors( + *dual_sample( + Euler(), + FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG), + FLOW, + steps, + mu=0.7, ) + ) -def test_euler_flow() -> None: - compare_samplers( - Euler(), - FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG), - FLOW, - mu=0.7, +@pytest.mark.parametrize( + ("predictor", "order", "stochastic", "steps"), + itertools.product( + [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")], + range(1, 3), # Their third order is fucked up. Turns into barf @ super high steps + (False, True), + STEP_RANGES, + ), +) +def test_dpm(predictor: tuple[DiffusionModel, str], order: int, stochastic: bool, steps: range) -> None: + compare_tensors( + *dual_sample( + DPM(order=order, add_noise=stochastic), + DPMSolverMultistepScheduler.from_config( + SCALED_CONFIG, + algorithm_type="sde-dpmsolver++" if stochastic else "dpmsolver++", + final_sigmas_type="zero", + solver_order=order, + prediction_type=predictor[1], + use_flow_sigmas=predictor[0] == FLOW, + ), + predictor[0], + steps, + ) ) -def test_dpm() -> None: - for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")]: - for order in range(1, 3): # Their third order is fucked up. Turns into barf @ super high steps - for stochastic in [False, True]: - compare_samplers( - DPM(order=order, add_noise=stochastic), - DPMSolverMultistepScheduler.from_config( - SCALED_CONFIG, - algorithm_type="sde-dpmsolver++" if stochastic else "dpmsolver++", - final_sigmas_type="zero", - solver_order=order, - prediction_type=predictor[1], - use_flow_sigmas=predictor[0] == FLOW, - ), - predictor[0], - message=f"{type(predictor[0]).__name__} o{order} s{stochastic}", - ) - - -def test_unipc() -> None: - for predictor in [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")]: +@pytest.mark.parametrize( + ("predictor", "order", "steps"), + itertools.product( + [(EPSILON, "epsilon"), (VELOCITY, "v_prediction"), (FLOW, "flow_prediction")], # technically it can do N order, but diffusers actually breaks down super hard with high order + steps # They use torch scalars for everything which accumulates error faster as steps and order increase # Considering Diffusers just NaNs out in like half the order as mine, I'm fine with fudging the margins - for order in range(1, 5): - compare_samplers( - UniPC(order=order, fast_solve=True), - UniPCMultistepScheduler.from_config( - SCALED_CONFIG, - final_sigmas_type="zero", - solver_order=order, - prediction_type=predictor[1], - use_flow_sigmas=predictor[0] == FLOW, - ), - predictor[0], - message=f"{type(predictor[0]).__name__} o{order}", - ) + range(1, 5), + STEP_RANGES, + ), +) +def test_unipc(predictor: tuple[DiffusionModel, str], order: int, steps: range) -> None: + compare_tensors( + *dual_sample( + UniPC(order=order, fast_solve=True), + UniPCMultistepScheduler.from_config( + SCALED_CONFIG, + final_sigmas_type="zero", + solver_order=order, + prediction_type=predictor[1], + use_flow_sigmas=predictor[0] == FLOW, + ), + predictor[0], + steps, + ) + ) @pytest.mark.parametrize( diff --git a/tests/diffusers_schedules.py b/tests/diffusers_schedules.py index 41c0a23..bc63b25 100644 --- a/tests/diffusers_schedules.py +++ b/tests/diffusers_schedules.py @@ -1,5 +1,6 @@ import math +import pytest import torch from diffusers.schedulers.scheduling_euler_discrete import EulerDiscreteScheduler from diffusers.schedulers.scheduling_flow_match_euler_discrete import FlowMatchEulerDiscreteScheduler @@ -7,56 +8,63 @@ from skrample.scheduling import ZSNR, Beta, Exponential, FlowShift, Karras, Linear, Scaled, SkrampleSchedule +STEPS = range(1, 12) + def compare_schedules( a: SkrampleSchedule, b: EulerDiscreteScheduler | FlowMatchEulerDiscreteScheduler, + steps: int, mu: float | None = None, ts_margin: float = 1.0, sig_margin: float = 1e-3, ) -> None: - for steps in range(1, 12): - if isinstance(b, FlowMatchEulerDiscreteScheduler): - # b.set_timesteps(num_inference_steps=steps, mu=mu) - # # flux pipe hardcodes sigmas to this... - b.set_timesteps(sigmas=torch.linspace(1.0, 1 / steps, steps), mu=mu) - else: - b.set_timesteps(num_inference_steps=steps) - - compare_tensors( - torch.from_numpy(a.timesteps_np(steps)), - b.timesteps, - f"TIMESTEPS @ {steps}", - margin=ts_margin, - ) - compare_tensors( - torch.from_numpy(a.sigmas_np(steps)), - b.sigmas[:-1], - f"SIGMAS @ {steps}", - margin=sig_margin, - ) - - -def test_scaled() -> None: + if isinstance(b, FlowMatchEulerDiscreteScheduler): + # b.set_timesteps(num_inference_steps=steps, mu=mu) + # # flux pipe hardcodes sigmas to this... + b.set_timesteps(sigmas=torch.linspace(1.0, 1 / steps, steps), mu=mu) + else: + b.set_timesteps(num_inference_steps=steps) + + compare_tensors( + torch.from_numpy(a.timesteps_np(steps)), + b.timesteps, + f"TIMESTEPS @ {steps}", + margin=ts_margin, + ) + compare_tensors( + torch.from_numpy(a.sigmas_np(steps)), + b.sigmas[:-1], + f"SIGMAS @ {steps}", + margin=sig_margin, + ) + + +@pytest.mark.parametrize("steps", STEPS) +def test_scaled(steps: int) -> None: compare_schedules( Scaled(uniform=False), EulerDiscreteScheduler.from_config( SCALED_CONFIG, ), + steps, ) -def test_scaled_uniform() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_scaled_uniform(steps: int) -> None: compare_schedules( Scaled(), EulerDiscreteScheduler.from_config( SCALED_CONFIG, timestep_spacing="trailing", ), + steps, ) -def test_scaled_beta() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_scaled_beta(steps: int) -> None: compare_schedules( Beta(Scaled()), EulerDiscreteScheduler.from_config( @@ -64,10 +72,12 @@ def test_scaled_beta() -> None: timestep_spacing="trailing", use_beta_sigmas=True, ), + steps, ) -def test_scaled_exponential() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_scaled_exponential(steps: int) -> None: compare_schedules( Exponential(Scaled()), EulerDiscreteScheduler.from_config( @@ -75,10 +85,12 @@ def test_scaled_exponential() -> None: timestep_spacing="trailing", use_exponential_sigmas=True, ), + steps, ) -def test_scaled_karras() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_scaled_karras(steps: int) -> None: compare_schedules( Karras(Scaled()), EulerDiscreteScheduler.from_config( @@ -86,61 +98,74 @@ def test_scaled_karras() -> None: timestep_spacing="trailing", use_karras_sigmas=True, ), + steps, ) -def test_zsnr() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_zsnr(steps: int) -> None: compare_schedules( ZSNR(), EulerDiscreteScheduler.from_config( SCALED_CONFIG | {"timestep_spacing": "trailing", "rescale_betas_zero_snr": True} ), + steps, ) -def test_flow_dynamic() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_flow_dynamic(steps: int) -> None: compare_schedules( FlowShift(Linear(), shift=math.exp(0.7)), FlowMatchEulerDiscreteScheduler.from_config( FLOW_CONFIG, ), + steps, mu=0.7, ) -def test_flow() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_flow(steps: int) -> None: compare_schedules( FlowShift(Linear()), FlowMatchEulerDiscreteScheduler.from_config(FLOW_CONFIG | {"use_dynamic_shifting": False}), + steps, mu=None, ) -def test_flow_beta() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_flow_beta(steps: int) -> None: compare_schedules( Beta(FlowShift(Linear())), FlowMatchEulerDiscreteScheduler.from_config( FLOW_CONFIG | {"use_dynamic_shifting": False}, use_beta_sigmas=True, ), + steps, ) -def test_flow_exponential() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_flow_exponential(steps: int) -> None: compare_schedules( Exponential(FlowShift(Linear())), FlowMatchEulerDiscreteScheduler.from_config( FLOW_CONFIG | {"use_dynamic_shifting": False}, use_exponential_sigmas=True, ), + steps, ) -def test_flow_karras() -> None: +@pytest.mark.parametrize("steps", STEPS) +def test_flow_karras(steps: int) -> None: compare_schedules( Karras(FlowShift(Linear())), FlowMatchEulerDiscreteScheduler.from_config( FLOW_CONFIG | {"use_dynamic_shifting": False}, use_karras_sigmas=True, ), + steps, ) diff --git a/tests/miscellaneous.py b/tests/miscellaneous.py index 4ad3254..8145c9c 100644 --- a/tests/miscellaneous.py +++ b/tests/miscellaneous.py @@ -82,17 +82,14 @@ ] -def test_sigmas_to_timesteps() -> None: - for schedule in [*(cls() for cls in ALL_SCHEDULES), Scaled(beta_scale=1)]: # base schedules - timesteps = schedule.timesteps_np(123) - timesteps_inv = schedule.sigmas_to_timesteps(schedule.sigmas_np(123)) - compare_tensors(torch.tensor(timesteps), torch.tensor(timesteps_inv), margin=0) # shocked this rounds good +@pytest.mark.parametrize("schedule", [*(cls() for cls in ALL_SCHEDULES), Scaled(beta_scale=1)]) +def test_sigmas_to_timesteps(schedule: ScheduleCommon) -> None: + timesteps = schedule.timesteps_np(123) + timesteps_inv = schedule.sigmas_to_timesteps(schedule.sigmas_np(123)) + compare_tensors(torch.tensor(timesteps), torch.tensor(timesteps_inv), margin=0) # shocked this rounds good -@pytest.mark.parametrize( - ("model_type", "sigma_transform"), - itertools.product(ALL_MODELS, ALL_TRANSFROMS), -) +@pytest.mark.parametrize(("model_type", "sigma_transform"), itertools.product(ALL_MODELS, ALL_TRANSFROMS)) def test_model_transforms(model_type: type[DiffusionModel], sigma_transform: SigmaTransform) -> None: model_transform = model_type() sample = 0.8 @@ -151,40 +148,45 @@ def model(x: float, t: float, s: float) -> float: assert abs(x_from - x_to) < 1e-12 -def test_sampler_generics() -> None: +@pytest.mark.parametrize( + ("sampler", "schedule"), + itertools.product( + [ + *(cls() for cls in ALL_STRUCTURED), + *(cls(order=cls.max_order()) for cls in ALL_STRUCTURED if issubclass(cls, StructuredMultistep)), + ], + [Scaled(), FlowShift(Linear())], + ), +) +def test_sampler_generics(sampler: StructuredSampler, schedule: ScheduleCommon) -> None: eps = 1e-12 - for sampler in [ - *(cls() for cls in ALL_STRUCTURED), - *(cls(order=cls.max_order()) for cls in ALL_STRUCTURED if issubclass(cls, StructuredMultistep)), - ]: - for schedule in Scaled(), FlowShift(Linear()): - i, o = random.random(), random.random() - prev = [SKSamples(random.random(), random.random(), random.random()) for _ in range(9)] - - scalar = sampler.sample(i, o, 4, schedule.schedule(10), schedule.sigma_transform, previous=prev).final - - # Enforce FP64 as that should be equivalent to python scalar - ndarr = sampler.sample( - np.array([i], dtype=np.float64), - np.array([o], dtype=np.float64), - 4, - schedule.schedule(10), - schedule.sigma_transform, - previous=prev, # type: ignore - ).final.item() - - tensor = sampler.sample( - torch.tensor([i], dtype=torch.float64), - torch.tensor([o], dtype=torch.float64), - 4, - schedule.schedule(10), - schedule.sigma_transform, - previous=prev, # type: ignore - ).final.item() - - assert abs(tensor - scalar) < eps - assert abs(tensor - ndarr) < eps - assert abs(scalar - ndarr) < eps + i, o = random.random(), random.random() + prev = [SKSamples(random.random(), random.random(), random.random()) for _ in range(9)] + + scalar = sampler.sample(i, o, 4, schedule.schedule(10), schedule.sigma_transform, previous=tuple(prev)).final + + # Enforce FP64 as that should be equivalent to python scalar + ndarr = sampler.sample( + np.array([i], dtype=np.float64), + np.array([o], dtype=np.float64), + 4, + schedule.schedule(10), + schedule.sigma_transform, + previous=prev, # type: ignore + ).final.item() + + tensor = sampler.sample( + torch.tensor([i], dtype=torch.float64), + torch.tensor([o], dtype=torch.float64), + 4, + schedule.schedule(10), + schedule.sigma_transform, + previous=prev, # type: ignore + ).final.item() + + assert abs(tensor - scalar) < eps + assert abs(tensor - ndarr) < eps + assert abs(scalar - ndarr) < eps def test_mu_set() -> None: @@ -195,124 +197,138 @@ def test_mu_set() -> None: assert a.schedule == b.schedule -def test_require_previous() -> None: - samplers: list[StructuredSampler] = [] - for cls in ALL_STRUCTURED: - if issubclass(cls, StructuredMultistep): - samplers.extend([cls(order=o + 1) for o in range(cls.min_order(), cls.max_order())]) - else: - samplers.append(cls()) - - for o1 in range(1, 4): - for o2 in range(1, 4): - samplers.append(UniPC(order=o1, solver=Adams(order=o2))) - samplers.append(SPC(predictor=Adams(order=o1), corrector=Adams(order=o2))) - - for sampler in samplers: - sample = 1.5 - prediction = 0.5 - previous = tuple(SKSamples(n / 2, n * 2, n * 1.5) for n in range(100)) - - a = sampler.sample( - sample, - prediction, - 31, - Linear().schedule(100), - sigma_complement, - None, - previous, - ) - b = sampler.sample( - sample, - prediction, - 31, - Linear().schedule(100), - sigma_complement, - None, - previous[len(previous) - sampler.require_previous :], - ) +@pytest.mark.parametrize( + ("sampler"), + [ + *( + sampler + for samplers in ( + (cls(order=o + 1) for o in range(cls.min_order(), cls.max_order())) + if issubclass(cls, StructuredMultistep) + else (cls(),) + for cls in ALL_STRUCTURED + ) + for sampler in samplers + ), + *(UniPC(order=o1, solver=Adams(order=o2)) for o1 in range(1, 4) for o2 in range(1, 4)), + *(SPC(predictor=Adams(order=o1), corrector=Adams(order=o2)) for o1 in range(1, 4) for o2 in range(1, 4)), + ], +) +def test_require_previous(sampler: StructuredSampler) -> None: + sample = 1.5 + prediction = 0.5 + previous = tuple(SKSamples(n / 2, n * 2, n * 1.5) for n in range(100)) - assert a == b, (sampler, sampler.require_previous) - - -def test_require_noise() -> None: - samplers: list[StructuredSampler] = [] - for cls in ALL_STRUCTURED: - if issubclass(cls, StructuredStochastic): - samplers.extend([cls(add_noise=n) for n in (False, True)]) - else: - samplers.append(cls()) - - for n1 in (False, True): - for n2 in (False, True): - samplers.append(UniPC(solver=DPM(add_noise=n2))) - samplers.append(SPC(predictor=DPM(add_noise=n1), corrector=DPM(add_noise=n2))) - - for sampler in samplers: - sample = 1.5 - prediction = 0.5 - previous = tuple(SKSamples(n / 2, n * 2, n * 1.5) for n in range(100)) - noise = -0.5 - - a = sampler.sample( - sample, - prediction, - 31, - Linear().schedule(100), - sigma_complement, - noise, - previous, - ) - b = sampler.sample( - sample, - prediction, - 31, - Linear().schedule(100), - sigma_complement, - noise if sampler.require_noise else None, - previous, - ) + a = sampler.sample( + sample, + prediction, + 31, + Linear().schedule(100), + sigma_complement, + None, + previous, + ) + b = sampler.sample( + sample, + prediction, + 31, + Linear().schedule(100), + sigma_complement, + None, + previous[len(previous) - sampler.require_previous :], + ) + + assert a == b + + +@pytest.mark.parametrize( + ("sampler"), + [ + *( + sampler + for samplers in ( + (cls(add_noise=n) for n in (False, True)) if issubclass(cls, StructuredStochastic) else (cls(),) + for cls in ALL_STRUCTURED + ) + for sampler in samplers + ), + *(UniPC(solver=DPM(add_noise=n1)) for n1 in (False, True)), + *( + SPC(predictor=DPM(add_noise=n1), corrector=DPM(add_noise=n2)) + for n1 in (False, True) + for n2 in (False, True) + ), + ], +) +def test_require_noise(sampler: StructuredSampler) -> None: + sample = 1.5 + prediction = 0.5 + previous = tuple(SKSamples(n / 2, n * 2, n * 1.5) for n in range(100)) + noise = -0.5 + + a = sampler.sample( + sample, + prediction, + 31, + Linear().schedule(100), + sigma_complement, + noise, + previous, + ) + b = sampler.sample( + sample, + prediction, + 31, + Linear().schedule(100), + sigma_complement, + noise if sampler.require_noise else None, + previous, + ) - # Don't compare stored noise since it's expected diff - b = replace(b, noise=a.noise) + # Don't compare stored noise since it's expected diff + b = replace(b, noise=a.noise) - assert a == b, (sampler, sampler.require_noise) + assert a == b -def test_functional_adapter() -> None: +@pytest.mark.parametrize( + ("sampler", "schedule", "steps"), + itertools.product( + [DPM(n, o) for o in range(1, 4) for n in [False, True]], + (cls() for cls in ALL_SCHEDULES), + [1, 3, 4, 9, 512, 999], + ), +) +def test_functional_adapter(sampler: StructuredSampler, schedule: ScheduleCommon, steps: int) -> None: def fake_model(x: float, _: float, s: float) -> float: return x + math.sin(x) * s - samplers: list[StructuredSampler] = [DPM(n, o) for o in range(1, 4) for n in [False, True]] - for schedule in Linear(), Scaled(): - for sampler in samplers: - for steps in [1, 3, 4, 9, 512, 999]: - sample = 1.5 - adapter = StructuredFunctionalAdapter(schedule, sampler) - noise = [random.random() for _ in range(steps)] - - rng = iter(noise) - model_transform = FlowModel() - sample_f = adapter.sample_model(sample, fake_model, model_transform, steps, rng=lambda: next(rng)) - - rng = iter(noise) - float_schedule = schedule.schedule(steps) - sample_s = sample - previous: list[SKSamples[float]] = [] - for n, (t, s) in enumerate(float_schedule): - results = sampler.sample( - sample_s, - model_transform.to_x(sample_s, fake_model(sample_s, t, s), s, schedule.sigma_transform), - n, - float_schedule, - schedule.sigma_transform, - next(rng), - tuple(previous), - ) - previous.append(results) - sample_s = results.final - - assert sample_s == sample_f, (sample_s, sample_f, sampler, schedule, steps) + sample = 1.5 + adapter = StructuredFunctionalAdapter(schedule, sampler) + noise = [random.random() for _ in range(steps)] + + rng = iter(noise) + model_transform = FlowModel() + sample_f = adapter.sample_model(sample, fake_model, model_transform, steps, rng=lambda: next(rng)) + + rng = iter(noise) + float_schedule = schedule.schedule(steps) + sample_s = sample + previous: list[SKSamples[float]] = [] + for n, (t, s) in enumerate(float_schedule): + results = sampler.sample( + sample_s, + model_transform.to_x(sample_s, fake_model(sample_s, t, s), s, schedule.sigma_transform), + n, + float_schedule, + schedule.sigma_transform, + next(rng), + tuple(previous), + ) + previous.append(results) + sample_s = results.final + + assert sample_s == sample_f def test_bashforth() -> None: @@ -322,19 +338,25 @@ def test_bashforth() -> None: assert np.allclose(coeffs, np.array(bashforth(n + 1)), atol=1e-12, rtol=1e-12) -def test_tableau_providers() -> None: - for provider in [ - tableaux.RK2, - tableaux.RK3, - tableaux.RK4, - tableaux.RKZ, - tableaux.RKE2, - tableaux.RKE3, - tableaux.RKE5, - ]: - for variant in provider: - if error := tableaux.validate_tableau(variant.tableau()): - raise error +@pytest.mark.parametrize( + ("provider"), + [ + variant + for provider in [ + tableaux.RK2, + tableaux.RK3, + tableaux.RK4, + tableaux.RKZ, + tableaux.RKE2, + tableaux.RKE3, + tableaux.RKE5, + ] + for variant in provider + ], +) +def test_tableau_providers(provider: tableaux.TableauProvider) -> None: + if error := tableaux.validate_tableau(provider.tableau()): + raise error def flat_tableau(t: tuple[float | tuple[float | tuple[float | tuple[float, ...], ...], ...], ...]) -> tuple[float, ...]: @@ -378,6 +400,24 @@ def test_rk3_tableau() -> None: ) +def test_rk4_tableau() -> None: + assert ( + tableau_distance( + ( # Eighth + ( + (0, ()), + (1 / 3, (1 / 3,)), + (2 / 3, (-1 / 3, 1)), + (1, (1, -1, 1)), + ), + (1 / 8, 3 / 8, 3 / 8, 1 / 8), + ), + tableaux.rk4_tableau(1 / 3, 2 / 3), + ) + < 1e-12 # Something like 4x the amount of math as RK3 + ) + + def test_sigmoid() -> None: items = spowf(torch.linspace(-2, 2, 9, dtype=torch.float64), 2) a = torch.sigmoid(items) diff --git a/tests/testing_common.py b/tests/testing_common.py index da79c9a..472a56d 100644 --- a/tests/testing_common.py +++ b/tests/testing_common.py @@ -43,7 +43,7 @@ def compare_tensors( a: torch.Tensor, b: torch.Tensor, message: str | None = "", - margin: float = 1e-4, + margin: float = 1e-8, ) -> None: assert a.isfinite().all(), message assert b.isfinite().all(), message