From d3f40d6a9e8f2a4918d2897fcc91d05165212ef7 Mon Sep 17 00:00:00 2001 From: ananthsub Date: Wed, 7 Oct 2020 04:28:23 -0700 Subject: [PATCH] Update to_disk to use fsspec for remote file support (#3930) * Update supporters.py * Update CHANGELOG.md * Update supporters.py * Update supporters.py * Update supporters.py * Update supporters.py * Update supporters.py * Update supporters.py * Update CHANGELOG.md Co-authored-by: Jirka Borovec --- CHANGELOG.md | 7 ++-- pytorch_lightning/trainer/supporters.py | 43 ++++++++++++++++--------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0db2548e2727f..9ba860f65f363 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Changed `LearningRateLogger` to `LearningRateMonitor` ([#3251](https://github.com/PyTorchLightning/pytorch-lightning/pull/3251)) - Used `fsspec` instead of `gfile` for all IO ([#3320](https://github.com/PyTorchLightning/pytorch-lightning/pull/3320)) + * Swap `torch.load` for `fsspec` load in DDP spawn backend ([#3787](https://github.com/PyTorchLightning/pytorch-lightning/pull/3787)) + * Swap `torch.load` for `fsspec` load in cloud_io loading ([#3692](https://github.com/PyTorchLightning/pytorch-lightning/pull/3692)) + * Added support for `to_disk()` to use remote filepaths with `fsspec` ([#3930](https://github.com/PyTorchLightning/pytorch-lightning/pull/3930)) - Refactor `GPUStatsMonitor` to improve training speed ([#3257](https://github.com/PyTorchLightning/pytorch-lightning/pull/3257)) @@ -55,10 +58,6 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - `row_log_interval` and `log_save_interval` are now based on training loop's `global_step` instead of epoch-internal batch index ([#3667](https://github.com/PyTorchLightning/pytorch-lightning/pull/3667)) -- Swap `torch.load` for `fsspec` load in DDP spawn backend ([#3787](https://github.com/PyTorchLightning/pytorch-lightning/pull/3787)) - -- Swap `torch.load` for `fsspec` load in cloud_io loading ([#3692](https://github.com/PyTorchLightning/pytorch-lightning/pull/3692)) - ### Deprecated - Rename Trainer arguments `row_log_interval` >> `log_every_n_steps` and `log_save_interval` >> `flush_logs_every_n_steps` ([#3748](https://github.com/PyTorchLightning/pytorch-lightning/pull/3748)) diff --git a/pytorch_lightning/trainer/supporters.py b/pytorch_lightning/trainer/supporters.py index c954a78e17c22..ee98e3614da67 100644 --- a/pytorch_lightning/trainer/supporters.py +++ b/pytorch_lightning/trainer/supporters.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pathlib import Path +import os from typing import Optional +import fsspec import torch +from pytorch_lightning.utilities.cloud_io import get_filesystem from torch import Tensor @@ -91,7 +93,7 @@ def _agg_memory(self, how: str): if self.rotated: return getattr(self.memory, how)() else: - return getattr(self.memory[:self.current_idx], how)() + return getattr(self.memory[: self.current_idx], how)() class Accumulator(object): @@ -109,7 +111,6 @@ def mean(self): class PredictionCollection(object): - def __init__(self, global_rank: int, world_size: int): self.global_rank = global_rank self.world_size = world_size @@ -122,7 +123,9 @@ def _add_prediction(self, name, values, filename): elif name not in self.predictions[filename]: self.predictions[filename][name] = values elif isinstance(values, Tensor): - self.predictions[filename][name] = torch.cat((self.predictions[filename][name], values)) + self.predictions[filename][name] = torch.cat( + (self.predictions[filename][name], values) + ) elif isinstance(values, list): self.predictions[filename][name].extend(values) @@ -135,25 +138,32 @@ def add(self, predictions): for feature_name, values in pred_dict.items(): self._add_prediction(feature_name, values, filename) - def to_disk(self): + def to_disk(self) -> None: """Write predictions to file(s). """ - for filename, predictions in self.predictions.items(): - - # Absolute path to defined prediction file. rank added to name if in multi-gpu environment - outfile = Path(filename).absolute() - outfile = outfile.with_name( - f"{outfile.stem}{f'_rank_{self.global_rank}' if self.world_size > 1 else ''}{outfile.suffix}" - ) - outfile.parent.mkdir(exist_ok=True, parents=True) + for filepath, predictions in self.predictions.items(): + fs = get_filesystem(filepath) + # normalize local filepaths only + if fs.protocol == "file": + filepath = os.path.realpath(filepath) + if self.world_size > 1: + stem, extension = os.path.splitext(filepath) + filepath = f"{stem}_rank_{self.global_rank}{extension}" + dirpath = os.path.split(filepath)[0] + fs.mkdirs(dirpath, exist_ok=True) # Convert any tensor values to list - predictions = {k: v if not isinstance(v, Tensor) else v.tolist() for k, v in predictions.items()} + predictions = { + k: v if not isinstance(v, Tensor) else v.tolist() + for k, v in predictions.items() + } # Check if all features for this file add up to same length feature_lens = {k: len(v) for k, v in predictions.items()} if len(set(feature_lens.values())) != 1: - raise ValueError('Mismatching feature column lengths found in stored EvalResult predictions.') + raise ValueError( + "Mismatching feature column lengths found in stored EvalResult predictions." + ) # Switch predictions so each entry has its own dict outputs = [] @@ -162,4 +172,5 @@ def to_disk(self): outputs.append(output_element) # Write predictions for current file to disk - torch.save(outputs, outfile) + with fs.open(filepath, "wb") as fp: + torch.save(outputs, fp)