Skip to content

Commit

Permalink
Fix DP Logging Aggregation (#4138)
Browse files Browse the repository at this point in the history
* add option to step result to do aggregation on a specific device

* in dp: do aggregation on root gpu

* Update CHANGELOG.md

* pep8

* trailing whitespace

* move to root


move result


stupid result object


revert to master


undo import


add "to" method to result


generalize to


try a test


try a test


Revert "try a test"

This reverts commit 22e3c10.

Revert "try a test"

This reverts commit 4d2d8fb.

new test


max epochs


super epoch end 


log in test


hanging test


undo test


initial test that fails on master


step end


pass


step end


step end


epoch end


print


step


check dev


clean up test


sanity check


wtf is go ing on


frustration


debugging test


test


test


test


test


test


test


test


test


unused import

* move chlog entry

* clean

* remove outdated changes

Co-authored-by: Adrian Wälchli <[email protected]>
Co-authored-by: Jirka Borovec <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2020
1 parent ed5bda3 commit f23f5e5
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Added feature to move tensors to CPU before saving ([#4309](https://github.com/PyTorchLightning/pytorch-lightning/pull/4309))

- Fixed `LoggerConnector` to have logged metrics on root device in DP ([#4138](https://github.com/PyTorchLightning/pytorch-lightning/pull/4138))


## [1.0.8] - 2020-11-24
Expand Down
10 changes: 7 additions & 3 deletions pytorch_lightning/core/step_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,15 @@ def detach(self):
if isinstance(v, torch.Tensor):
self.__setitem__(k, v.detach())

def cpu(self):
"""Move all self attributes to CPU."""
def to(self, *args, **kwargs):
"""Move all self attributes to the given device."""
for k, v in self.items():
if isinstance(v, torch.Tensor):
self.__setitem__(k, v.cpu())
self.__setitem__(k, v.to(*args, **kwargs))

def cpu(self):
"""Move all self attributes to CPU."""
self.to(torch.device("cpu"))

def __repr__(self):
self_copy = self.copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union

import torch

from pytorch_lightning.core.step_result import Result
from pytorch_lightning.utilities.exceptions import MisconfigurationException

Expand Down Expand Up @@ -402,6 +404,8 @@ def cache_result(self) -> None:
hook_result.detach()
if self.trainer.move_metrics_to_cpu:
hook_result.cpu()
elif self.trainer.use_dp:
hook_result.to(torch.device("cuda", self.trainer.root_gpu))

self._internals[fx_name].append(
hook_result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import ChainMap
from copy import deepcopy
from pprint import pprint
from typing import Iterable, Union, cast
from typing import Iterable, Union, Optional

import torch

Expand Down Expand Up @@ -289,10 +289,7 @@ def get_evaluate_epoch_results(self, test_mode):
return results

def _track_callback_metrics(self, eval_results, using_eval_result):
if (
len(eval_results) > 0 and
(eval_results[0] is None or not isinstance(eval_results[0], Result))
):
if len(eval_results) > 0 and (eval_results[0] is None or not isinstance(eval_results[0], Result)):
return

if using_eval_result:
Expand Down Expand Up @@ -388,11 +385,13 @@ def on_train_epoch_end(self):
# inform cached logger connector epoch finished
self.cached_results.has_batch_loop_finished = True

def log_train_epoch_end_metrics(self,
epoch_output,
checkpoint_accumulator,
early_stopping_accumulator,
num_optimizers):
def log_train_epoch_end_metrics(
self,
epoch_output,
checkpoint_accumulator,
early_stopping_accumulator,
num_optimizers,
):
# epoch output is a list. Each item in that list has all the outputs per optimizer
# epoch_output[optimizer_idx][training_step_idx][tbptt_index]
# remember that not using truncated backprop is equivalent with truncated back prop of len(1)
Expand Down
61 changes: 61 additions & 0 deletions tests/trainer/logging/test_logger_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pytest
import torch
from torch.utils.data import DataLoader

from pytorch_lightning.callbacks.base import Callback
from pytorch_lightning.core.step_result import Result
Expand Down Expand Up @@ -387,3 +388,63 @@ def test_call_back_validator(tmpdir):
on_step=None,
on_epoch=None)
assert result is None


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires two GPUs")
def test_epoch_results_cache_dp(tmpdir):

root_device = torch.device("cuda", 0)

class TestModel(BoringModel):

def training_step(self, *args, **kwargs):
result = super().training_step(*args, **kwargs)
self.log("train_loss_epoch", result["loss"], on_step=False, on_epoch=True)
return result

def training_step_end(self, training_step_outputs): # required for dp
loss = training_step_outputs["loss"].mean()
return loss

def training_epoch_end(self, outputs):
assert all(out["loss"].device == root_device for out in outputs)
assert self.trainer.callback_metrics["train_loss_epoch"].device == root_device

def validation_step(self, *args, **kwargs):
val_loss = torch.rand(1, device=torch.device("cuda", 1))
self.log("val_loss_epoch", val_loss, on_step=False, on_epoch=True)
return val_loss

def validation_epoch_end(self, outputs):
assert all(loss.device == root_device for loss in outputs)
assert self.trainer.callback_metrics["val_loss_epoch"].device == root_device

def test_step(self, *args, **kwargs):
test_loss = torch.rand(1, device=torch.device("cuda", 1))
self.log("test_loss_epoch", test_loss, on_step=False, on_epoch=True)
return test_loss

def test_epoch_end(self, outputs):
assert all(loss.device == root_device for loss in outputs)
assert self.trainer.callback_metrics["test_loss_epoch"].device == root_device

def train_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

def val_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

def test_dataloader(self):
return DataLoader(RandomDataset(32, 64), batch_size=4)

model = TestModel()
trainer = Trainer(
default_root_dir=tmpdir,
accelerator="dp",
gpus=2,
limit_train_batches=2,
limit_val_batches=2,
max_epochs=1,
)
trainer.fit(model)
trainer.test(model, ckpt_path=None)

0 comments on commit f23f5e5

Please sign in to comment.