Skip to content

Conversation

@semioz
Copy link
Contributor

@semioz semioz commented Aug 24, 2025

Resuming data buffer properly from a checkpoint. When saving, it stores the rollout buffer, metadata, and the dataset to disk. Whether to restore from a checkpoint or start fresh is controlled by a simple config flag just like we had with resume_step.


GitHub Issue: #748

@mikasenghaas
Copy link
Member

sorry, we gon fix the w&b api key so the integration test runs proper.

@semioz
Copy link
Contributor Author

semioz commented Aug 25, 2025

@mikasenghaas was about to test but should i also convert my torch.save's to safetensors as you mentioned in this issue? #821

@mikasenghaas
Copy link
Member

@semioz nope, wait with this for now. we will rewrite from torch.save to safetensors.save in a separate pr. depending on whether this or the other pr goes in first, you may have to rebase

@mikasenghaas mikasenghaas self-requested a review August 25, 2025 08:15
Copy link
Member

@mikasenghaas mikasenghaas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of comments, but on a good way! nice job

Comment on lines 105 to 108
torch.save(self.rollout_buffer, path / "rollout_buffer.pt")
torch.save(self.metadata, path / "metadata.pt")

self.dataset.save_to_disk(path / "buffer_dataset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, i can see how this is the easiest way to implement this but i wonder if somehow we could save/load from a single dataset instance. eg. metadata could easily be a column in the dataset. rollout buffer is a bit trickier tho.. i think it's fine like this for now but it feels like a single hf dataset might be the most clean way of serializing

return sampled_rollouts


def load_buffer(path: Path, buffer_config: DataBufferConfigType) -> Buffer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this function in-place like our other checkpoint logic?

Comment on lines 94 to 95
buffer_path = step_path / "buffer"
buffer.save(buffer_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be inside _save_to_path for async checkpointing to work properly

Comment on lines 100 to 107
# Load buffer from checkpoint
if config.ckpt.resume_buffer_from_checkpoint:
logger.info("Resuming buffer from checkpoint")
buffer_path = ckpt_manager.get_ckpt_path(config.ckpt.resume_step) / "buffer"
buffer = load_buffer(str(buffer_path), config.buffer)
else:
logger.info("Initializing buffer from scratch")
buffer = setup_buffer(dataset, config.buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make the suggested changes to the buffer load function and the checkpoint, there should be very minimal changes to the orchestrator code

@semioz
Copy link
Contributor Author

semioz commented Aug 25, 2025

@mikasenghaas thanks. converted the load/save process to fully hf via columns and moved the resume option to buffer configs(lmk if you guys don't wanna make it configurable at all) also other changes you requested. ready for your review again i guess.

@mikasenghaas
Copy link
Member

Closing as continued in #839

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants