Skip to content

Commit

Permalink
Remove unused map_unordered methods
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-wer committed Sep 17, 2024
1 parent 3639af2 commit f0b0dbe
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 63 deletions.
4 changes: 0 additions & 4 deletions cluster_tools/cluster_tools/executor_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ def submit(
**kwargs: _P.kwargs,
) -> Future[_T]: ...

def map_unordered(
self, fn: Callable[[_S], _T], args: Iterable[_S]
) -> Iterator[_T]: ...

def map_to_futures(
self,
fn: Callable[[_S], _T],
Expand Down
12 changes: 0 additions & 12 deletions cluster_tools/cluster_tools/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,6 @@ def check_resources(
enrich_future_with_uncaught_warning(fut)
return fut

def map_unordered(self, fn: Callable[[_S], _T], args: Iterable[_S]) -> Iterator[_T]:
futs: List[Future[_T]] = self.map_to_futures(fn, args)

# Return a separate generator to avoid that map_unordered
# is executed lazily (otherwise, jobs would be submitted
# lazily, as well).
def result_generator() -> Iterator:
for fut in self.as_completed(futs):
yield fut.result()

return result_generator()

def map_to_futures(
self,
fn: Callable[[_S], _T],
Expand Down
14 changes: 1 addition & 13 deletions cluster_tools/cluster_tools/executors/multiprocessing_.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CFutDict(TypedDict):
class MultiprocessingExecutor(ProcessPoolExecutor):
"""
Wraps the ProcessPoolExecutor to add various features:
- map_to_futures and map_unordered method
- map_to_futures method
- pickling of job's output (see output_pickle_path_getter and output_pickle_path)
"""

Expand Down Expand Up @@ -182,18 +182,6 @@ def _execute_and_persist_function(
pickling.dump((True, result), file)
return result

def map_unordered(self, fn: Callable[_P, _T], args: Any) -> Iterator[_T]:
futs: List[Future[_T]] = self.map_to_futures(fn, args)

# Return a separate generator to avoid that map_unordered
# is executed lazily (otherwise, jobs would be submitted
# lazily, as well).
def result_generator() -> Iterator:
for fut in futures.as_completed(futs):
yield fut.result()

return result_generator()

def map_to_futures(
self,
fn: Callable[[_S], _T],
Expand Down
11 changes: 0 additions & 11 deletions cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,17 +657,6 @@ def result_generator() -> Iterator[_T]:

return result_generator()

def map_unordered(self, fn: Callable[_P, _T], args: Any) -> Iterator[_T]:
futs = self.map_to_futures(fn, args)

# Return a separate generator to avoid that map_unordered
# is executed lazily.
def result_generator() -> Iterator[_T]:
for fut in futures.as_completed(futs):
yield fut.result()

return result_generator()

def forward_log(self, fut: Future[_T]) -> _T:
"""
Takes a future from which the log file is forwarded to the active
Expand Down
23 changes: 0 additions & 23 deletions cluster_tools/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,29 +237,6 @@ def test_unordered_sleep(exc: cluster_tools.Executor) -> None:
assert result == duration


@pytest.mark.parametrize("exc", get_executors(), ids=type)
def test_unordered_map(exc: cluster_tools.Executor) -> None:
with exc:
durations = [3, 1]
results_gen = exc.map_unordered(sleep, durations)
results = list(results_gen)

if isinstance(
exc,
(
cluster_tools.SequentialExecutor,
cluster_tools.SequentialPickleExecutor,
),
):
# futures.as_completed does not return previously completed futures in completion order.
# For sequential executors as_completed is only called after all futures completed, though.
results.sort()

durations.sort()
for duration, result in zip(durations, results):
assert result == duration


@pytest.mark.parametrize("exc", get_executors(), ids=type)
def test_map_to_futures(exc: cluster_tools.Executor) -> None:
with exc:
Expand Down

0 comments on commit f0b0dbe

Please sign in to comment.