Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DP Logging Aggregation #4138

Merged
merged 9 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Added feature to move tensors to CPU before saving ([#4309](https://github.com/PyTorchLightning/pytorch-lightning/pull/4309))

- Fixed `LoggerConnector` to have logged metrics on root device in DP ([#4138](https://github.com/PyTorchLightning/pytorch-lightning/pull/4138))


## [1.0.8] - 2020-11-24
Expand Down
10 changes: 7 additions & 3 deletions pytorch_lightning/core/step_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,15 @@ def detach(self):
if isinstance(v, torch.Tensor):
self.__setitem__(k, v.detach())

def cpu(self):
"""Move all self attributes to CPU."""
def to(self, *args, **kwargs):
"""Move all self attributes to the given device."""
for k, v in self.items():
if isinstance(v, torch.Tensor):
self.__setitem__(k, v.cpu())
self.__setitem__(k, v.to(*args, **kwargs))

def cpu(self):
"""Move all self attributes to CPU."""
self.to(torch.device("cpu"))

def __repr__(self):
self_copy = self.copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union

import torch

from pytorch_lightning.core.step_result import Result
from pytorch_lightning.utilities.exceptions import MisconfigurationException

Expand Down Expand Up @@ -402,6 +404,8 @@ def cache_result(self) -> None:
hook_result.detach()
if self.trainer.move_metrics_to_cpu:
hook_result.cpu()
elif self.trainer.use_dp:
hook_result.to(torch.device("cuda", self.trainer.root_gpu))

self._internals[fx_name].append(
hook_result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import ChainMap
from copy import deepcopy
from pprint import pprint
from typing import Iterable, Union, cast
from typing import Iterable, Union, Optional

import torch

Expand Down Expand Up @@ -289,10 +289,7 @@ def get_evaluate_epoch_results(self, test_mode):
return results

def _track_callback_metrics(self, eval_results, using_eval_result):
if (
len(eval_results) > 0 and
(eval_results[0] is None or not isinstance(eval_results[0], Result))
):
if len(eval_results) > 0 and (eval_results[0] is None or not isinstance(eval_results[0], Result)):
return

if using_eval_result:
Expand Down Expand Up @@ -388,11 +385,13 @@ def on_train_epoch_end(self):
# inform cached logger connector epoch finished
self.cached_results.has_batch_loop_finished = True

def log_train_epoch_end_metrics(self,
epoch_output,
checkpoint_accumulator,
early_stopping_accumulator,
num_optimizers):
def log_train_epoch_end_metrics(
self,
epoch_output,
checkpoint_accumulator,
early_stopping_accumulator,
num_optimizers,
):
# epoch output is a list. Each item in that list has all the outputs per optimizer
# epoch_output[optimizer_idx][training_step_idx][tbptt_index]
# remember that not using truncated backprop is equivalent with truncated back prop of len(1)
Expand Down
61 changes: 61 additions & 0 deletions tests/trainer/logging/test_logger_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pytest
import torch
from torch.utils.data import DataLoader

from pytorch_lightning.callbacks.base import Callback
from pytorch_lightning.core.step_result import Result
Expand Down Expand Up @@ -387,3 +388,63 @@ def test_call_back_validator(tmpdir):
on_step=None,
on_epoch=None)
assert result is None


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires two GPUs")
def test_epoch_results_cache_dp(tmpdir):

root_device = torch.device("cuda", 0)

class TestModel(BoringModel):

def training_step(self, *args, **kwargs):
result = super().training_step(*args, **kwargs)
self.log("train_loss_epoch", result["loss"], on_step=False, on_epoch=True)
return result

def training_step_end(self, training_step_outputs): # required for dp
loss = training_step_outputs["loss"].mean()
return loss

def training_epoch_end(self, outputs):
assert all(out["loss"].device == root_device for out in outputs)
assert self.trainer.callback_metrics["train_loss_epoch"].device == root_device

def validation_step(self, *args, **kwargs):
val_loss = torch.rand(1, device=torch.device("cuda", 1))
self.log("val_loss_epoch", val_loss, on_step=False, on_epoch=True)
return val_loss

def validation_epoch_end(self, outputs):
assert all(loss.device == root_device for loss in outputs)
assert self.trainer.callback_metrics["val_loss_epoch"].device == root_device

def test_step(self, *args, **kwargs):
test_loss = torch.rand(1, device=torch.device("cuda", 1))
self.log("test_loss_epoch", test_loss, on_step=False, on_epoch=True)
return test_loss

def test_epoch_end(self, outputs):
assert all(loss.device == root_device for loss in outputs)
assert self.trainer.callback_metrics["test_loss_epoch"].device == root_device

def train_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

def val_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

def test_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

model = TestModel()
trainer = Trainer(
default_root_dir=tmpdir,
accelerator="dp",
gpus=2,
limit_train_batches=2,
limit_val_batches=2,
max_epochs=1,
)
trainer.fit(model)
trainer.test(model, ckpt_path=None)