Skip to content

Commit

Permalink
fix: concurrency file order TDE-1213 (#998)
Browse files Browse the repository at this point in the history
#### Motivation

When supplying multiple source locations to use for standardising, if
there are overlapping source images, the VRT created does not honour the
priority order for the supplied datasets. This Pull Request is to retain
the priority of the images for merging.

#### Modification

Move away from using `as_completed` for parallel processing so that the
output file list is in the same order as the input file list.

#### Checklist

_If not applicable, provide explanation of why._

- [x] Tests updated
- [x] Docs updated
- [x] Issue linked in Title
  • Loading branch information
amfage authored Jul 15, 2024
1 parent 289880c commit 37dcb8b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 24 deletions.
48 changes: 24 additions & 24 deletions scripts/files/fs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -105,14 +105,17 @@ def write_all(inputs: list[str], target: str, concurrency: int | None = 4, gener
Returns:
list of written file paths
"""
results: list[Future] = [] # type: ignore
written_tiffs: list[str] = []
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futuress = {write_file(executor, input_, target, generate_name): input_ for input_ in inputs}
for future in as_completed(futuress):
if future.exception():
get_log().warn("Failed Read-Write", error=future.exception())
else:
written_tiffs.append(future.result())
for input_ in inputs:
results.append(executor.submit(write_file, input_, target, generate_name))

for future in results:
if future.exception():
get_log().warn("Failed Read-Write", error=future.exception())
else:
written_tiffs.append(future.result())

if len(inputs) != len(written_tiffs):
get_log().error("Missing Files", count=len(inputs) - len(written_tiffs))
Expand All @@ -130,26 +133,28 @@ def write_sidecars(inputs: list[str], target: str, concurrency: int | None = 4)
target: target folder to write to
concurrency: max thread pool workers
"""
results: list[Future] = [] # type: ignore
with ThreadPoolExecutor(max_workers=concurrency) as executor:
results = {write_file(executor, input_, target): input_ for input_ in inputs}
for future in as_completed(results):
future_ex = future.exception()
if isinstance(future_ex, NoSuchFileError):
get_log().info("No sidecar file found; skipping", path=future_ex.path)
else:
get_log().info("wrote_sidecar_file", path=future.result())
for input_ in inputs:
results.append(executor.submit(write_file, input_, target))

for future in results:
future_ex = future.exception()
if isinstance(future_ex, NoSuchFileError):
get_log().info("No sidecar file found; skipping", error=future.exception())
else:
get_log().info("wrote_sidecar_file", path=future.result())


def write_file(executor: ThreadPoolExecutor, input_: str, target: str, generate_name: bool | None = True) -> Future[str]:
def write_file(input_: str, target: str, generate_name: bool | None = True) -> str:
"""Read a file from a path and write it to a target path.
Args:
executor: A ThreadPoolExecutor instance.
input_: A path to a file to read.
input: A path to a file to read.
target: A path to write the file to.
generate_name: create a target file name based on multihash the source filename
Returns:
Future[str]: The result of the execution.
str: Target file name.
"""
get_log().info(f"Trying write from file: {input_}")

Expand All @@ -159,12 +164,7 @@ def write_file(executor: ThreadPoolExecutor, input_: str, target: str, generate_
else:
target_file_name = os.path.basename(input_)

try:
return executor.submit(copy, input_, os.path.join(target, target_file_name))
except NoSuchFileError as nsfe:
future: Future[str] = Future()
future.set_exception(nsfe)
return future
return copy(input_, os.path.join(target, target_file_name))


class NoSuchFileError(Exception):
Expand Down
16 changes: 16 additions & 0 deletions scripts/files/tests/fs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ def test_write_sidecars_one_found(capsys: CaptureFixture[str], subtests: SubTest
rmtree(target)


def test_write_all_in_order(setup: str) -> None:
inputs: list[str] = []
file_contents = "a" * 1000 * 1000
i = 0
while i < 10:
path = Path(os.path.join(setup, str(i)))
if i % 2 == 0:
path.write_text(file_contents, encoding="utf-8") # 1MB
else:
path.touch()
inputs.append(path.as_posix())
i += 1
written_files = write_all(inputs=inputs, target=setup, generate_name=False)
assert written_files == inputs


@mock_aws
def test_should_get_s3_object_modified_datetime() -> None:
bucket_name = "any-bucket-name"
Expand Down

0 comments on commit 37dcb8b

Please sign in to comment.