Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def should_preserve(

@dataclasses.dataclass
class EveryNSteps(PreservationPolicy):
"""Ensures checkpoints are preserved at least every N steps."""
"""Preserves checkpoints after at least N steps."""

interval_steps: int

Expand All @@ -157,7 +157,17 @@ def should_preserve(
) -> Sequence[bool]:
if self.interval_steps == 0:
raise ValueError("interval_steps must not be 0.")
result = [ckpt.step % self.interval_steps == 0 for ckpt in checkpoints]
result = []
previous_step = None
for i, ckpt in enumerate(checkpoints):
if i == 0:
result.append(True) # Always preserve the first checkpoint.
previous_step = ckpt.step
elif ckpt.step - previous_step >= self.interval_steps:
result.append(True)
previous_step = ckpt.step
else:
result.append(False)
_log_preservation_decision(
f"EveryNSteps (interval_steps={self.interval_steps})",
checkpoints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def setUp(self):
super().setUp()
self.preservation_context = preservation_policy_lib.PreservationContext()

def get_checkpoints(self, n: int):
def get_checkpoints(self, steps: Sequence[int] = (0, 1, 2, 3, 4)):
checkpoints = []
time_increment = datetime.timedelta(seconds=1)
start_time = datetime.datetime.now()
for i in range(n):
for i, step in enumerate(steps):
current_time = start_time + i * time_increment
checkpoints.append(
checkpoint_info.CheckpointInfo(
step=i,
step=step,
time=current_time,
metrics=None
)
Expand Down Expand Up @@ -79,7 +79,8 @@ def test_latest_n_policy(self, n, expected_preserved_steps):

self.assertSequenceEqual(
expected_preserved_steps,
self.get_preserved_checkpoints(self.get_checkpoints(5), policy),
self.get_preserved_checkpoints(
self.get_checkpoints(), policy),
)

@parameterized.parameters(
Expand All @@ -103,35 +104,45 @@ def test_every_n_seconds_policy(

self.assertSequenceEqual(
expected_preserved_steps,
self.get_preserved_checkpoints(self.get_checkpoints(5), policy),
self.get_preserved_checkpoints(self.get_checkpoints(), policy),
)

@parameterized.parameters(
dict(
interval_steps=1,
steps=[0, 1, 2, 3, 4],
expected_preserved_steps=[0, 1, 2, 3, 4],
),
dict(
interval_steps=3,
steps=[0, 1, 2, 3, 4],
expected_preserved_steps=[0, 3],
),
dict(
interval_steps=6,
steps=[0, 1, 2, 3, 4],
expected_preserved_steps=[0],
),
dict(
interval_steps=3,
steps=[0, 1, 2, 4, 5, 8, 9, 13, 14, 25],
expected_preserved_steps=[0, 4, 8, 13, 25],
),
)
def test_every_n_steps_policy(self, interval_steps, expected_preserved_steps):
def test_every_n_steps_policy(
self, interval_steps, steps, expected_preserved_steps
):
policy = preservation_policy_lib.EveryNSteps(interval_steps=interval_steps)

self.assertEqual(
expected_preserved_steps,
self.get_preserved_checkpoints(self.get_checkpoints(5), policy),
self.get_preserved_checkpoints(self.get_checkpoints(steps), policy),
)

def test_every_zero_steps_policy_raises_error(self):
policy = preservation_policy_lib.EveryNSteps(interval_steps=0)
with self.assertRaises(ValueError):
self.get_preserved_checkpoints(self.get_checkpoints(5), policy)
self.get_preserved_checkpoints(self.get_checkpoints(), policy)

@parameterized.parameters(
dict(
Expand All @@ -152,7 +163,7 @@ def test_custom_steps_policy(self, steps, expected_preserved_steps):

self.assertEqual(
expected_preserved_steps,
self.get_preserved_checkpoints(self.get_checkpoints(5), policy),
self.get_preserved_checkpoints(self.get_checkpoints(), policy),
)

@parameterized.parameters(
Expand Down Expand Up @@ -190,7 +201,7 @@ def test_best_n_policy(
n=n,
keep_checkpoints_without_metrics=keep_checkpoints_without_metrics,
)
checkpoints = self.get_checkpoints(5)
checkpoints = self.get_checkpoints()
for i, checkpoint in enumerate(checkpoints):
if loss[i]:
checkpoint.metrics = {'loss': loss[i]}
Expand Down Expand Up @@ -219,7 +230,8 @@ def test_joint_preservation_policy(self):
]
)
loss = [5, None, 4, None, 3, None, 11, None, 8, None, 12, None]
checkpoints = self.get_checkpoints(12)
steps = [0, 1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
checkpoints = self.get_checkpoints(steps)
for i, checkpoint in enumerate(checkpoints):
if loss[i]:
checkpoint.metrics = {'loss': loss[i]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ class PersistentCheckpointOptions:
may be considered for deletion when there are more than `max_to_keep`
checkpoints present.
keep_period:
If set, any existing checkpoints matching checkpoint_step % keep_period == 0
will not be deleted.
If set, any existing checkpoints after every at least keep_period steps will
be preserved.
should_save_fn:
Predicate callable to check if given step can be saved. This callable
accepts step number and optional latest step number as param and returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ def test_all_steps(
@parameterized.parameters(
(2, 2, 2, [0, 2, 4, 6, 8, 9]),
(3, 2, 4, [0, 4, 7, 8, 9]),
(2, 2, 5, [0, 8, 9]),
(2, 2, 5, [0, 6, 8, 9]),
(2, 6, 3, [0, 6, 8, 9]),
)
def test_all_steps_with_keep_interval(
Expand Down
Loading