Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
5db8b9a
syncing to 1.7.2
dmlyubim Jan 10, 2023
55bc018
common public rllib cql renames
dmlyubim Jan 10, 2023
31b77f5
patching sac dist class get
dmlyubim Jan 10, 2023
844dba4
retrofitting rllib/offline package to 1.7.2
dmlyubim Jan 10, 2023
ace3f85
retrofit space_utils 1.7.2
dmlyubim Jan 10, 2023
6640af3
retrofit ray.tune.registry to 1.7.2 (add input registry)
dmlyubim Jan 10, 2023
a9b7a56
test changes
dmlyubim Jan 10, 2023
e09c33d
cql test pendulum data
dmlyubim Jan 10, 2023
313f88a
in 1.3, replay buffer isn't reworked to track capacity vs. current size
dmlyubim Jan 10, 2023
1e5159a
Updating metrics to 1.7.2 (update sampled count on request to enable …
dmlyubim Jan 11, 2023
5f12afa
slight test refactoring to enable intermediate debugging
dmlyubim Jan 11, 2023
8598a97
fixing bazel test //rllib:test_cql
dmlyubim Jan 11, 2023
0c20a1b
additional cql_sac cleanup
dmlyubim Jan 11, 2023
2dbfb9d
removing cql apex sac tests
dmlyubim Jan 12, 2023
016bde6
rolling back non-existent policy call signature in offline component
dmlyubim Jan 12, 2023
e099a1d
trying to fix macos python verison at 3.8.15
dmlyubim Jan 12, 2023
625bf4b
changing bazel definition for test_cql.
dmlyubim Jan 13, 2023
d5abccb
parity with BUILD for test_cql in 1.7.2 (removing data glob) -- does …
dmlyubim Jan 13, 2023
d56abda
fixes -- this now runs with the benchmark
dmlyubim Jan 13, 2023
fb7ef1a
Rolling back cql_dqn cleanup
dmlyubim Jan 19, 2023
90660d0
trying to add data label to test
dmlyubim Jan 21, 2023
057eebf
set recursive mod 777 on /home/vsts/work/_temp/_bazel_vsts directory …
Kiko-Aumond Jan 23, 2023
aa2eae4
use $TEST_TMPDIR env variable instead of literal directory name
Kiko-Aumond Jan 24, 2023
e2f9e7f
Kiko/cql 1.7.2 port (#172)
Kiko-Aumond Jan 24, 2023
f809b8f
brining more changes from 1.13.0 to update timesteps_total metric cor…
dmlyubim Jan 25, 2023
65ddbce
Merge branch 'dmlyubim/cql-1.7.2-port' of github.com:BonsaiAI/ray int…
dmlyubim Jan 25, 2023
bf7c81d
REVERTING TO PYTHON 3.8 FOR MAC
dmlyubim Jan 26, 2023
1f8c889
explicitly set MACOSX_DEPLOYMENT_TARGET env variable
Kiko-Aumond Jan 26, 2023
3008348
Merge remote-tracking branch 'origin/dmlyubim/cql-1.7.2-port' into ki…
Kiko-Aumond Jan 26, 2023
fcadf4b
removed minor version of Python; renamed steps to relect correct Pyth…
Kiko-Aumond Jan 26, 2023
e19f432
get latest pip version to test MacOs wheels
Kiko-Aumond Jan 27, 2023
2dfff9d
updated hash
Kiko-Aumond Jan 27, 2023
0752873
undid changes to info,yml
Kiko-Aumond Jan 27, 2023
278c16c
unbounded setuptools
Kiko-Aumond Jan 27, 2023
9326393
undid change
Kiko-Aumond Jan 27, 2023
1473ead
Fix MacOs version if bdist_wheel generates incorrect MacOS version ta…
Kiko-Aumond Jan 27, 2023
af5edfe
undid changes
Kiko-Aumond Jan 27, 2023
4c89d1e
undid changes
Kiko-Aumond Jan 27, 2023
e0f89f8
undid changes
Kiko-Aumond Jan 27, 2023
53fb100
force reinstall tune and upstream requirements
Kiko-Aumond Jan 28, 2023
a6543c9
updatd CI hash
Kiko-Aumond Jan 28, 2023
62c1e59
updated dependencies
Kiko-Aumond Jan 28, 2023
e8e2720
updated requirements
Kiko-Aumond Jan 28, 2023
dc74f94
updated requirements
Kiko-Aumond Jan 28, 2023
25c4738
updated requirements
Kiko-Aumond Jan 28, 2023
4ffb1fe
updated requirements
Kiko-Aumond Jan 28, 2023
3cd0f4d
updated requirements
Kiko-Aumond Jan 28, 2023
9075ae5
updated ci folder hash
Kiko-Aumond Jan 28, 2023
46972ee
updated requirements
Kiko-Aumond Jan 28, 2023
0d91855
updated requirements
Kiko-Aumond Jan 30, 2023
d99f6d4
updates CI hash
Kiko-Aumond Jan 30, 2023
3d7bfcb
updated requirements
Kiko-Aumond Jan 30, 2023
8853407
updated requirements
Kiko-Aumond Jan 30, 2023
6eeac08
updated requirements
Kiko-Aumond Jan 30, 2023
9b42244
updated requirements
Kiko-Aumond Jan 30, 2023
fa6a757
undid requirement changes
Kiko-Aumond Jan 30, 2023
c6c1561
updated ci folder hash
Kiko-Aumond Jan 30, 2023
ed41b13
updated requirements
Kiko-Aumond Jan 30, 2023
7e076a0
updated requirements
Kiko-Aumond Jan 30, 2023
6316d3b
updated requirements
Kiko-Aumond Jan 30, 2023
9b943ca
updated requirements
Kiko-Aumond Jan 30, 2023
7fa7440
updated requirements
Kiko-Aumond Jan 30, 2023
a6a468f
updated requirements
Kiko-Aumond Jan 30, 2023
c880a18
updated requirements
Kiko-Aumond Jan 30, 2023
1f024d1
updated dependencies
Kiko-Aumond Jan 30, 2023
68d169f
updated requirements
Kiko-Aumond Jan 30, 2023
f2b8c73
updated dependencies
Kiko-Aumond Jan 30, 2023
ec88717
apt update
Kiko-Aumond Jan 30, 2023
3e4af46
fixed GCC download, set Ubuntu 20.04 as default OS for pipeline
Kiko-Aumond Jan 31, 2023
3fd1ad1
updated requirements
Kiko-Aumond Jan 31, 2023
5cb8de8
updated requirements
Kiko-Aumond Jan 31, 2023
17f2a7a
fixed setup.py
Kiko-Aumond Jan 31, 2023
3e1e2b4
updated ci hash
Kiko-Aumond Jan 31, 2023
9604db6
fixed setup.py
Kiko-Aumond Jan 31, 2023
bc9c7b7
fixed setup.py
Kiko-Aumond Jan 31, 2023
42b0881
fixed setup.py
Kiko-Aumond Jan 31, 2023
0b7d070
updated requirements
Kiko-Aumond Jan 31, 2023
5124b5d
fixed setup.py
Kiko-Aumond Jan 31, 2023
91df95f
force reintall of torch and torchvision
Kiko-Aumond Jan 31, 2023
8041c80
updated ci hash
Kiko-Aumond Jan 31, 2023
6cf662b
fixed rllib requirements
Kiko-Aumond Feb 1, 2023
1cf1948
updated requirements
Kiko-Aumond Feb 1, 2023
52683ff
updated requirements
Kiko-Aumond Feb 1, 2023
b90db59
updated requirements
Kiko-Aumond Feb 1, 2023
fb6bb23
updated requirements
Kiko-Aumond Feb 1, 2023
0e7ba4e
updated requirements
Kiko-Aumond Feb 1, 2023
feea4f5
updated requirements
Kiko-Aumond Feb 1, 2023
a6836af
updated dependencies
Kiko-Aumond Feb 1, 2023
1f33ba2
updated dependencies
Kiko-Aumond Feb 1, 2023
f965da8
updated requirements
Kiko-Aumond Feb 1, 2023
c268d44
updated requirements
Kiko-Aumond Feb 2, 2023
fe4a03f
updated requirements
Kiko-Aumond Feb 2, 2023
5ecffd4
explicitly set locale in MacOS to fix test_signal
Kiko-Aumond Feb 2, 2023
e780cf1
set build Ubuntu OS version to focal
Kiko-Aumond Feb 6, 2023
1402b2f
do not bubble up StopIterations in ParallelIteratr
Kiko-Aumond Feb 6, 2023
4489a5c
fixed memory_monitor import
Kiko-Aumond Feb 7, 2023
ceab8cd
undid non-build changes
Kiko-Aumond Feb 7, 2023
50dc53e
Merge remote-tracking branch 'origin/releases/1.3.0' into kiko/focal_…
Kiko-Aumond Feb 7, 2023
4a09ad5
Merge remote-tracking branch 'origin/kiko/focal_build' into kiko/fix_…
Kiko-Aumond Feb 7, 2023
4043d11
added new NoSamplesAvailable exception
Kiko-Aumond Feb 9, 2023
bed44d2
fixed exception handling
Kiko-Aumond Feb 9, 2023
8ddc5e9
fixed tests
Kiko-Aumond Feb 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: generic
# Use Ubuntu 18.04
dist: bionic
# Use Ubuntu 20.04
dist: focal

git:
clone: false # Clone manually to work around Travis issues like https://github.com/travis-ci/travis-ci/issues/6337
Expand Down
2 changes: 1 addition & 1 deletion ci/azure_pipelines/templates/info.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ steps:
# the hash was computed in Mac
if [[ $AGENT_OS == "Darwin" ]]; then
pushd $BUILD_SOURCESDIRECTORY
EXPECTED_HASH_TRAVIS='7e5de1d2a8ccd0947747164f845b3c195dab93d12e3692f82de969a9b849d937'
EXPECTED_HASH_TRAVIS='ee20821c018bbc7aa7d4ca551d5542f9ae9399b255701a70ccdd808d814adacc'
CURRENT_HASH_TRAVIS=$(shasum -a 256 ./.travis.yml | awk '{print $1}')
if [[ $EXPECTED_HASH_TRAVIS != $CURRENT_HASH_TRAVIS ]]; then
echo "The original Travis file of the project has changed"
Expand Down
18,975 changes: 18 additions & 18,957 deletions dashboard/client/package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions python/ray/tests/test_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import ray
from ray.util.iter import from_items, from_iterators, from_range, \
from_actors, ParallelIteratorWorker, LocalIterator, was_cause_by_stop_iteration
from_actors, NoSamplesAvailable, ParallelIteratorWorker, LocalIterator, was_cause_by_stop_iteration
from ray.test_utils import Semaphore


Expand Down Expand Up @@ -551,7 +551,7 @@ def test_batch_across_shards(ray_start_regular_shared):
for x in it.batch_across_shards():
collected.append(x)
attempts_collect_counts[attempts] += 1
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
continue
else:
Expand Down Expand Up @@ -582,7 +582,7 @@ def test_batch_across_unbalanced_shards(ray_start_regular_shared):
for x in it.batch_across_shards():
collected.append(x)
attempts_collect_counts[attempts] += 1
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
continue
else:
Expand Down
72 changes: 42 additions & 30 deletions python/ray/util/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import TypeVar, Generic, Iterable, List, Callable, Any, Iterator

import ray
from ray._private.memory_monitor import RayOutOfMemoryError
from ray.util.iter_metrics import MetricsContext, SharedMetrics

logger = logging.getLogger(__name__)
Expand All @@ -18,7 +19,7 @@


def was_cause_by_stop_iteration(ex) -> bool:
if isinstance(ex, StopIteration):
if isinstance(ex, (StopIteration, NoSamplesAvailable)):
return True
elif ex.__cause__ is not None:
return was_cause_by_stop_iteration(ex.__cause__)
Expand Down Expand Up @@ -457,7 +458,7 @@ def base_iterator(num_partitions, partition_index, timeout=None):
batch_ms=batch_ms)] = actor
for item in batch:
yield item
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
pass
else:
Expand Down Expand Up @@ -525,7 +526,10 @@ def base_iterator(timeout=None):
yield _NextValueNotReady()
except TimeoutError:
yield _NextValueNotReady()
except (StopIteration, RuntimeError) as ex:
# Propagate OOM exceptions up the stack
except RayOutOfMemoryError:
raise
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
# If we are streaming (infinite sequence) then
# we want to try again as long as at least one
Expand All @@ -536,21 +540,20 @@ def base_iterator(timeout=None):
for a, f in zip(list(active), futures):
try:
results.append(ray.get(f))
except (StopIteration, RuntimeError) as ex_i:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex_i:
if was_cause_by_stop_iteration(ex_i):
if self.is_infinite_sequence:
stoped_actors.append(a)
else:
active.remove(a)
else:
# BONSAI changes begin - added sampling error logging
exc_info = ex_i
if isinstance(ex_i, StopIteration) and ex_i.__cause__ is not None:
exc_info = ex_i.__cause__
# ex_is is never a StopIteration since was_cause_by_stop_iteration
# in the if part of the clause will catch all StopIterations
logger.exception(
"Encountered an exception while extracting "
"valid data from `futures`.",
exc_info=exc_info
exc_info=ex_i
)
# BONSAI changes end
raise ex_i
Expand All @@ -564,20 +567,19 @@ def base_iterator(timeout=None):
# BONSAI changes end
yield results
elif self.is_infinite_sequence and len(stoped_actors) == len(active):
raise ex
raise NoSamplesAvailable(ex)
# BONSAI changes begin - added logging
logger.info(f"Kicking off {len(active)} new sampling tasks.")
# BONSAI changes end
futures = [a.par_iter_next.remote() for a in active]
else:
# BONSAI changes begin - added sampling error logging
exc_info = ex
if isinstance(ex, StopIteration) and ex.__cause__ is not None:
exc_info = ex.__cause__
# ex is never a StopIteration since was_cause_by_stop_iteration
# in the if part of the clause will catch all StopIterations
logger.exception(
"Encountered an exception while extracting "
"valid data from `futures`.",
exc_info=exc_info
exc_info=ex
)
# BONSAI changes end
raise ex
Expand Down Expand Up @@ -651,7 +653,7 @@ def base_iterator(timeout=None):
active_actors.add(actor)
for item in batch:
yield item
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
# If we are streaming (infinite sequence) then
# we want to try again as long as at least one
Expand Down Expand Up @@ -791,7 +793,7 @@ def base_iterator(timeout=None):
yield _NextValueNotReady()
except TimeoutError:
yield _NextValueNotReady()
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
break
else:
Expand Down Expand Up @@ -899,7 +901,7 @@ def __next__(self):
self._build_once()
try:
return next(self.built_iterator)
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
# Force the regeneration of the base iterator
if self.is_infinite_sequence:
Expand Down Expand Up @@ -932,7 +934,7 @@ def for_each(self, fn: Callable[[T], U], max_concurrency=1,

def apply_foreach(it):
for item in it:
if isinstance(item, _NextValueNotReady):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
if hasattr(fn, LocalIterator.HANDLE_NEXT_VALUE_NOT_READY_HOOK_NAME):
with self._metrics_context():
result = fn._handle_next_value_not_ready(item)
Expand All @@ -957,7 +959,7 @@ def apply_foreach(it):
remote = ray.remote(fn).options(**resources)
remote_fn = remote.remote
for item in it:
if isinstance(item, _NextValueNotReady):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
yield item
else:
if max_concurrency and len(cur) >= max_concurrency:
Expand All @@ -982,7 +984,7 @@ def add_wait_hooks(it):
fn._on_fetch_start()
new_item = False
item = next(it)
if not isinstance(item, _NextValueNotReady):
if not isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
new_item = True
yield item

Expand All @@ -999,7 +1001,7 @@ def filter(self, fn: Callable[[T], bool]) -> "LocalIterator[T]":
def apply_filter(it):
for item in it:
with self._metrics_context():
if isinstance(item, _NextValueNotReady) or fn(item):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)) or fn(item):
yield item

return LocalIterator(
Expand All @@ -1013,7 +1015,7 @@ def batch(self, n: int) -> "LocalIterator[List[T]]":
def apply_batch(it):
batch = []
for item in it:
if isinstance(item, _NextValueNotReady):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
yield item
else:
batch.append(item)
Expand All @@ -1034,12 +1036,12 @@ def flatten(self) -> "LocalIterator[T[0]]":
def apply_flatten(it):
try:
for item in it:
if isinstance(item, _NextValueNotReady):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
yield item
else:
for subitem in item:
yield subitem
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if not was_cause_by_stop_iteration(ex):
raise ex

Expand Down Expand Up @@ -1071,7 +1073,7 @@ def shuffle(self, shuffle_buffer_size: int,
def apply_shuffle(it):
buffer = []
for item in it:
if isinstance(item, _NextValueNotReady):
if isinstance(item, (_NextValueNotReady, NoSamplesAvailable)):
yield item
else:
buffer.append(item)
Expand Down Expand Up @@ -1164,7 +1166,7 @@ def gen(timeout):
if len(queues[i]) == 0:
try:
fill_next(timeout)
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
return
else:
Expand Down Expand Up @@ -1267,7 +1269,7 @@ def build_union(timeout=None):
yield_counts[i] += 1
pull_counts[i] = 0
yield item
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
fix_weights = [
w != "*" for w in round_robin_weights
Expand Down Expand Up @@ -1357,7 +1359,7 @@ def __iter__(self) -> Iterator[Any]:
def __next__(self) -> Any:
try:
return next(self.inner_iterator)
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
self._make_inner_iterator()
# If we have an infinite sequence means that we have an stream
Expand Down Expand Up @@ -1406,7 +1408,7 @@ def par_iter_next_batch(self, batch_ms: int):
while time.time() < t_end:
try:
batch.append(self.par_iter_next())
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
if len(batch) == 0:
raise StopIteration
Expand All @@ -1431,7 +1433,7 @@ def par_iter_slice(self, step: int, start: int):
try:
val = next(self.local_it)
self.next_ith_buffer[j].append(val)
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
pass
else:
Expand All @@ -1452,7 +1454,7 @@ def par_iter_slice_batch(self, step: int, start: int, batch_ms: int):
while time.time() < t_end:
try:
batch.append(self.par_iter_slice(step, start))
except (StopIteration, RuntimeError) as ex:
except (StopIteration, RuntimeError, NoSamplesAvailable) as ex:
if was_cause_by_stop_iteration(ex):
if len(batch) == 0:
raise StopIteration
Expand All @@ -1479,6 +1481,16 @@ class _NextValueNotReady(Exception):
pass


class NoSamplesAvailable(Exception):
"""
Indicates that a ParallelIterator has no samples currently available.
It is ParallelIterator's equivalent of LocalIterator's _NextValueNotReady
This could be due to a slow sim, for instance, and reflects a
situation where a retry is warranted.
"""
pass


class _ActorSet(object):
"""Helper class that represents a set of actors and transforms."""

Expand Down
67 changes: 54 additions & 13 deletions rllib/agents/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.util.iter import NoSamplesAvailable
from ray.rllib.utils import FilterManager, deep_update, merge_dicts
from ray.rllib.utils.spaces import space_utils
from ray.rllib.utils.framework import try_import_tf, TensorStructType
Expand Down Expand Up @@ -461,6 +462,26 @@
# yapf: enable


def is_memory_error(e: Exception) -> bool:
"""Check if an exception occurred due to a process running out of memory."""
memory_error_names = [
"ray.memory_monitor.RayOutOfMemoryError",
"RayOutOfMemoryError",
]
ename = type(e).__name__

if ename in memory_error_names:
return True

msg_list = list(filter(lambda s: len(s) > 0, str(e).split("\n")))

if ename.startswith("RayTaskError"):
return any(
any(ename in msg for msg in msg_list) for ename in memory_error_names
)
return False


@DeveloperAPI
def with_common_config(
extra_config: PartialTrainerConfigDict) -> TrainerConfigDict:
Expand Down Expand Up @@ -601,20 +622,40 @@ def train(self) -> ResultDict:
for _ in range(1 + MAX_WORKER_FAILURE_RETRIES):
try:
result = Trainable.train(self)
except RayError as e:
if self.config["ignore_worker_failures"]:
logger.exception(
"Error in train call, attempting to recover")
self._try_recover()
else:
logger.info(
"Worker crashed during call to train(). To attempt to "
"continue training without the failed worker, set "
"`'ignore_worker_failures': True`.")
raise e
except Exception as e:
time.sleep(0.5) # allow logs messages to propagate
raise e
if issubclass(e, RayError):
# do not retry in case of OOM errors
if is_memory_error(e):
logger.exception("Not attempting to recover from error in train call "
"since it was caused by an OOM error",
exc_info=e)
time.sleep(0.5) # allow logs messages to propagate
raise e
else:
# always retry on NoSamplesAvailable as this is by definition
# a retryable situation
if isinstance(e, NoSamplesAvailable):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than catching it here and doing _try_recover(), we need to handle it in our bonsai code (I'll show you).

logger.info("No samples available yet, retrying.")
self._try_recover()
elif self.config["ignore_worker_failures"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

logger.exception("Error in train call and ignore_worker_failures==True, "
"attempting to recover",
exc_info=e)
self._try_recover()
else:
logger.info(
"Worker crashed during call to train(). To attempt to "
"continue training without the failed worker, set "
"`'ignore_worker_failures': True`.")
raise e
else:
if isinstance(e, StopIteration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle it here.

pass
else:
logger.exception("Not attempting to recover from error in train call",
exc_info=e)
time.sleep(0.5) # allow logs messages to propagate
raise e
else:
break
if result is None:
Expand Down
Loading