diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9f28feb8..b6d2772b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,7 +6,7 @@ on: - "main" jobs: - windows-and-linux-build: + build: name: Build on ${{ matrix.platform }} with ${{ matrix.build_type }} configuration strategy: matrix: @@ -16,11 +16,17 @@ jobs: platform: - "windows-latest" - "ubuntu-latest" + - "macos-latest" # arm + - "macos-13" # x86_64 include: - platform: "windows-latest" vcpkg_triplet: "x64-windows-static" - platform: "ubuntu-latest" vcpkg_triplet: "x64-linux" + - platform: "macos-latest" + vcpkg_triplet: "arm64-osx" + - platform: "macos-13" + vcpkg_triplet: "x64-osx" runs-on: ${{ matrix.platform }} @@ -38,13 +44,18 @@ jobs: - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH ./vcpkg integrate install shell: bash + - name: Install OpenMP + if: matrix.platform == 'macos-latest' || matrix.platform == 'macos-13' + run: | + brew install libomp + - name: CMake run: | cmake --preset=default -DVCPKG_TARGET_TRIPLET=${{matrix.vcpkg_triplet}} @@ -57,62 +68,7 @@ jobs: - uses: actions/upload-artifact@v4 with: - name: ${{matrix.platform}} ${{matrix.build_type}} binaries - path: ${{github.workspace}}/*.zip - - mac-build: - strategy: - matrix: - build_type: - - "Debug" - - "Release" - - runs-on: "macos-latest" - - permissions: - actions: write - - concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-macos-latest-${{ matrix.build_type }} - cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} - - steps: - - uses: actions/checkout@v3 - with: - submodules: true - - - name: Install vcpkg - run: | - git clone https://github.com/microsoft/vcpkg.git - cd vcpkg && ./bootstrap-vcpkg.sh - echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV - echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH - ./vcpkg integrate install - shell: bash - - - name: Build for x64 - run: | - cmake --preset=default -DVCPKG_TARGET_TRIPLET=x64-osx -DVCPKG_INSTALLED_DIR=${{github.workspace}}/vcpkg-x64 -B ${{github.workspace}}/build-x64 -DCMAKE_BUILD_TYPE=${{matrix.build_type}} -DCMAKE_OSX_ARCHITECTURES="x86_64" -DBUILD_TESTING=OFF - cmake --build ${{github.workspace}}/build-x64 --config ${{matrix.build_type}} - - - name: Build for arm64 - run: | - cmake --preset=default -DVCPKG_TARGET_TRIPLET=arm64-osx -DVCPKG_INSTALLED_DIR=${{github.workspace}}/vcpkg-arm64 -B ${{github.workspace}}/build-arm64 -DCMAKE_BUILD_TYPE=${{matrix.build_type}} -DCMAKE_OSX_ARCHITECTURES="arm64" -DBUILD_TESTING=OFF - cmake --build ${{github.workspace}}/build-arm64 --config ${{matrix.build_type}} - - - name: Create a universal binary - run: | - cp -r ${{github.workspace}}/build-x64 ${{github.workspace}}/build && cd ${{github.workspace}}/build - for filename in $(find . -type f -exec grep -H "build-x64" {} \; | awk '{print $1}' | sed -e 's/:.*//' | sort -u); do sed -i.bak -e "s/build-x64/build/g" $filename && rm ${filename}.bak; done - for lib in `find . -type f \( -name "*.so" -o -name "*.a" \)`; do rm $lib && lipo -create ../build-x64/${lib} ../build-arm64/${lib} -output $lib; done - - - name: Package - run: | - cpack --config ${{github.workspace}}/build/CPackConfig.cmake -C ${{matrix.build_type}} -G ZIP - - - uses: actions/upload-artifact@v4 - with: - name: macos-latest ${{matrix.build_type}} binaries + name: ${{matrix.platform == 'macos-latest' && 'macos-arm64' || (matrix.platform == 'macos-13' && 'macos-x86_64' || matrix.platform)}} ${{matrix.build_type}} binaries path: ${{github.workspace}}/*.zip build-wheel: @@ -121,7 +77,8 @@ jobs: platform: - "windows-latest" - "ubuntu-22.04" - - "macos-latest" # TODO (aliddell): universal binary? + - "macos-latest" # arm + - "macos-13" # x86_64 runs-on: ${{ matrix.platform }} @@ -140,11 +97,11 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.13" - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 33956703..11711329 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,18 +10,24 @@ env: BUILD_TYPE: Release jobs: - windows-and-linux-build: + build: name: Build on ${{ matrix.platform }} strategy: matrix: platform: - "windows-latest" - "ubuntu-latest" + - "macos-latest" + - "macos-13" include: - platform: "windows-latest" vcpkg_triplet: "x64-windows-static" - platform: "ubuntu-latest" vcpkg_triplet: "x64-linux" + - platform: "macos-latest" + vcpkg_triplet: "arm64-osx" + - platform: "macos-13" + vcpkg_triplet: "x64-osx" runs-on: ${{ matrix.platform }} @@ -39,13 +45,18 @@ jobs: - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH ./vcpkg integrate install shell: bash + - name: Install OpenMP + if: matrix.platform == 'macos-latest' || matrix.platform == 'macos-13' + run: | + brew install libomp + - name: Build run: | cmake --preset=default -DVCPKG_TARGET_TRIPLET=${{matrix.vcpkg_triplet}} -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=Release @@ -66,60 +77,6 @@ jobs: name: ${{matrix.platform}} binaries path: ${{github.workspace}}/*.zip - mac-build: - name: Build on macos-latest - runs-on: "macos-latest" - - permissions: - actions: write - - concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ matrix.platform }} - cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} - - steps: - - uses: actions/checkout@v3 - with: - submodules: true - - - name: Install vcpkg - run: | - git clone https://github.com/microsoft/vcpkg.git - cd vcpkg && ./bootstrap-vcpkg.sh - echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV - echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH - ./vcpkg integrate install - shell: bash - - - name: Build for x64 - run: | - cmake --preset=default -DVCPKG_TARGET_TRIPLET=x64-osx -DVCPKG_INSTALLED_DIR=${{github.workspace}}/vcpkg-x64 -B ${{github.workspace}}/build-x64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_OSX_ARCHITECTURES="x86_64" -DBUILD_TESTING=OFF - cmake --build ${{github.workspace}}/build-x64 --config Release - - - name: Build for arm64 - run: | - cmake --preset=default -DVCPKG_TARGET_TRIPLET=arm64-osx -DVCPKG_INSTALLED_DIR=${{github.workspace}}/vcpkg-arm64 -B ${{github.workspace}}/build-arm64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_OSX_ARCHITECTURES="arm64" -DBUILD_TESTING=OFF - cmake --build ${{github.workspace}}/build-arm64 --config Release - - - name: Test # don't release if tests are failing - working-directory: ${{github.workspace}}/build-arm64 - run: ctest -C Release -L anyplatform --output-on-failure - - - name: Create a universal binary - run: | - cp -r ${{github.workspace}}/build-x64 ${{github.workspace}}/build && cd ${{github.workspace}}/build - for filename in $(find . -type f -exec grep -H "build-x64" {} \; | awk '{print $1}' | sed -e 's/:.*//' | sort -u); do sed -i.bak -e "s/build-x64/build/g" $filename && rm ${filename}.bak; done - for lib in `find . -type f \( -name "*.so" -o -name "*.a" \)`; do rm $lib && lipo -create ../build-x64/${lib} ../build-arm64/${lib} -output $lib; done - - - name: Package - run: | - cpack --config ${{github.workspace}}/build/CPackConfig.cmake -C Release -G ZIP - - - uses: actions/upload-artifact@v4 - with: - name: macos-latest binaries - path: ${{github.workspace}}/*.zip - build-wheel: name: Build wheels for Python strategy: @@ -127,7 +84,8 @@ jobs: platform: - "windows-latest" - "ubuntu-22.04" - - "macos-latest" # TODO (aliddell): universal binary? + - "macos-latest" # arm + - "macos-13" # x86_64 python: - "3.9" - "3.10" @@ -156,13 +114,18 @@ jobs: - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH ./vcpkg integrate install shell: bash + - name: Install OpenMP + if: matrix.platform == 'macos-latest' || matrix.platform == 'macos-13' + run: | + brew install libomp + - name: Install system dependencies if: ${{ matrix.platform == 'ubuntu-22.04' }} run: | @@ -190,8 +153,7 @@ jobs: release: needs: - - windows-and-linux-build - - mac-build + - build - build-wheel name: "Release" runs-on: "ubuntu-latest" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9907176a..ed6dd4e0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,13 +45,18 @@ jobs: - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH ./vcpkg integrate install shell: bash + - name: Install OpenMP + if: matrix.platform == 'macos-latest' + run: | + brew install libomp + - name: Configure CMake run: | cmake --preset=default -DVCPKG_TARGET_TRIPLET=${{matrix.vcpkg_triplet}} @@ -104,7 +109,7 @@ jobs: - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH @@ -155,20 +160,25 @@ jobs: submodules: true ref: ${{ github.event.pull_request.head.sha }} - - name: Set up Python 3.11 + - name: Set up Python 3.13 uses: actions/setup-python@v4 with: - python-version: "3.11" + python-version: "3.13" - name: Install vcpkg run: | - git clone https://github.com/microsoft/vcpkg.git + git clone https://github.com/microsoft/vcpkg.git -b 2025.02.14 cd vcpkg && ./bootstrap-vcpkg.sh echo "VCPKG_ROOT=${{github.workspace}}/vcpkg" >> $GITHUB_ENV echo "${{github.workspace}}/vcpkg" >> $GITHUB_PATH ./vcpkg integrate install shell: bash + - name: Install OpenMP + if: matrix.platform == 'macos-latest' + run: | + brew install libomp + - name: Install dependencies run: python -m pip install -U pip "pybind11[global]" cmake build numpy pytest diff --git a/CMakeLists.txt b/CMakeLists.txt index c694e791..a9ddb521 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,17 +4,19 @@ cmake_policy(SET CMP0057 NEW) # allows IN_LIST operator (for pybind11) cmake_policy(SET CMP0079 NEW) # allows use with targets in other directories enable_testing() -find_package(nlohmann_json CONFIG REQUIRED) -find_package(blosc CONFIG REQUIRED) -find_package(miniocpp CONFIG REQUIRED) -find_package(Crc32c CONFIG REQUIRED) - include(cmake/aq_require.cmake) include(cmake/git-versioning.cmake) include(cmake/ide.cmake) include(cmake/install-prefix.cmake) include(cmake/wsl.cmake) include(cmake/simd.cmake) +include(cmake/openmp.cmake) + +find_package(nlohmann_json CONFIG REQUIRED) +find_package(blosc CONFIG REQUIRED) +find_package(miniocpp CONFIG REQUIRED) +find_package(Crc32c CONFIG REQUIRED) +find_package(OpenMP REQUIRED) set(CMAKE_C_STANDARD 11) set(CMAKE_CXX_STANDARD 20) diff --git a/README.md b/README.md index f01bf1c1..d6930abe 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,12 @@ EOF If you're using Windows, learn how to set environment variables [here](https://learn.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_environment_variables?view=powershell-7.4#set-environment-variables-in-the-system-control-panel). You will need to set both the `VCPKG_ROOT` and `PATH` variables in the system control panel. +On the Mac, you will also need to install OpenMP using Homebrew: + +```bash +brew install libomp +``` + ### Configuring To build the library, you can use CMake: diff --git a/cmake/openmp.cmake b/cmake/openmp.cmake new file mode 100644 index 00000000..d05d3e08 --- /dev/null +++ b/cmake/openmp.cmake @@ -0,0 +1,31 @@ +if (APPLE) + # Determine architecture + if (${CMAKE_HOST_SYSTEM_PROCESSOR} STREQUAL "arm64") + # Apple Silicon + set(LIBOMP_PATH "/opt/homebrew/opt/libomp") + set(CMAKE_OSX_ARCHITECTURES "arm64") + else () + # Intel Mac + set(LIBOMP_PATH "/usr/local/opt/libomp") + set(CMAKE_OSX_ARCHITECTURES "x86_64") + endif () + + # OpenMP support + set(OpenMP_C_FLAGS "-Xclang -fopenmp -I${LIBOMP_PATH}/include") + set(OpenMP_CXX_FLAGS "-Xclang -fopenmp -I${LIBOMP_PATH}/include") + set(OpenMP_C_LIB_NAMES "omp") + set(OpenMP_CXX_LIB_NAMES "omp") + + # Use static library + set(OpenMP_omp_LIBRARY "${LIBOMP_PATH}/lib/libomp.a") + + # Ensure consistent architecture for all libraries + add_compile_options(-march=native) + + # Add linking flags + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L${LIBOMP_PATH}/lib") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -L${LIBOMP_PATH}/lib -lomp") + + # Disable universal binaries as they can cause architecture issues + set(CMAKE_XCODE_ATTRIBUTE_ONLY_ACTIVE_ARCH "YES") +endif () \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2dcdc3d6..01679e5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ build-backend = "setuptools.build_meta" [project] name = "acquire-zarr" -version = "0.2.3" +version = "0.2.4" description = "Performant streaming to Zarr storage, on filesystem or cloud" authors = [ {name = "Alan Liddell", email = "aliddell@chanzuckerberg.com"} diff --git a/python/acquire-zarr-py.cpp b/python/acquire-zarr-py.cpp index 5ce0ff12..af2bb3c6 100644 --- a/python/acquire-zarr-py.cpp +++ b/python/acquire-zarr-py.cpp @@ -402,10 +402,14 @@ class PyZarrStream auto buf = image_data.request(); auto* ptr = (uint8_t*)buf.ptr; + py::gil_scoped_release release; + size_t bytes_out; auto status = ZarrStream_append( stream_.get(), ptr, buf.itemsize * buf.size, &bytes_out); + py::gil_scoped_acquire acquire; + if (status != ZarrStatusCode_Success) { std::string err = "Failed to append data to Zarr stream: " + std::string(Zarr_get_status_message(status)); diff --git a/src/streaming/CMakeLists.txt b/src/streaming/CMakeLists.txt index 53b0c7d1..7f2d0c25 100644 --- a/src/streaming/CMakeLists.txt +++ b/src/streaming/CMakeLists.txt @@ -1,5 +1,11 @@ set(tgt acquire-zarr) +if (WIN32) + set(PLATFORM_FILE_SINK_CPP win32/file.sink.impl.cpp) +else () + set(PLATFORM_FILE_SINK_CPP posix/file.sink.impl.cpp) +endif () + add_library(${tgt} macros.hh acquire.zarr.cpp @@ -19,6 +25,7 @@ add_library(${tgt} sink.cpp file.sink.hh file.sink.cpp + ${PLATFORM_FILE_SINK_CPP} s3.sink.hh s3.sink.cpp array.writer.hh @@ -44,6 +51,7 @@ target_link_libraries(${tgt} PRIVATE blosc_static miniocpp::miniocpp Crc32c::crc32c + OpenMP::OpenMP_CXX ) target_compile_definitions(${tgt} PRIVATE diff --git a/src/streaming/array.writer.cpp b/src/streaming/array.writer.cpp index 17a98042..5eb5ec07 100644 --- a/src/streaming/array.writer.cpp +++ b/src/streaming/array.writer.cpp @@ -5,11 +5,13 @@ #include "sink.hh" #include +#include #include #include -#ifdef min +#if defined(min) || defined(max) #undef min +#undef max #endif bool @@ -176,7 +178,7 @@ zarr::ArrayWriter::make_metadata_sink_() metadata_sink_ = is_s3_array_() ? make_s3_sink(*config_.bucket_name, metadata_path, s3_connection_pool_) - : make_file_sink(metadata_path, true); + : make_file_sink(metadata_path); if (!metadata_sink_) { LOG_ERROR("Failed to create metadata sink: ", metadata_path); @@ -209,8 +211,6 @@ zarr::ArrayWriter::write_frame_to_chunks_(std::span data) const auto bytes_per_chunk = dimensions->bytes_per_chunk(); const auto bytes_per_row = tile_cols * bytes_per_px; - size_t bytes_written = 0; - const auto n_tiles_x = (frame_cols + tile_cols - 1) / tile_cols; const auto n_tiles_y = (frame_rows + tile_rows - 1) / tile_rows; @@ -224,44 +224,58 @@ zarr::ArrayWriter::write_frame_to_chunks_(std::span data) const auto chunk_offset = static_cast(dimensions->chunk_internal_offset(frame_id)); - for (auto i = 0; i < n_tiles_y; ++i) { - // TODO (aliddell): we can optimize this when tiles_per_frame_x_ is 1 - for (auto j = 0; j < n_tiles_x; ++j) { - const auto c = group_offset + i * n_tiles_x + j; - auto chunk_ptr = get_chunk_data_(c) + chunk_offset; - const auto chunk_end = chunk_ptr + bytes_per_chunk; - - for (auto k = 0; k < tile_rows; ++k) { - const auto frame_row = i * tile_rows + k; - if (frame_row < frame_rows) { - const auto frame_col = j * tile_cols; - - const auto region_width = - std::min(frame_col + tile_cols, frame_cols) - frame_col; - - const auto region_start = static_cast( - bytes_per_px * (frame_row * frame_cols + frame_col)); - const auto nbytes = - static_cast(region_width * bytes_per_px); - const auto region_stop = region_start + nbytes; - if (region_stop > data.size()) { - LOG_ERROR("Buffer overflow"); - return bytes_written; - } - - // copy region - if (nbytes > std::distance(chunk_ptr, chunk_end)) { - LOG_ERROR("Buffer overflow"); - return bytes_written; - } - std::copy(data.begin() + region_start, - data.begin() + region_stop, - chunk_ptr); - - bytes_written += (region_stop - region_start); - } - chunk_ptr += static_cast(bytes_per_row); + const auto* data_ptr = data.data(); + const auto data_size = data.size(); + + size_t bytes_written = 0; + const auto n_tiles = n_tiles_x * n_tiles_y; + + // Using the entire thread pool breaks in CI due to a likely resource + // contention. Using 75% of the thread pool should be enough to avoid, but + // we should still find a fix if we can. +#pragma omp parallel for reduction(+ : bytes_written) \ + num_threads(std::max(3 * thread_pool_->n_threads() / 4, 1u)) + for (auto tile = 0; tile < n_tiles; ++tile) { + const auto tile_idx_y = tile / n_tiles_x; + const auto tile_idx_x = tile % n_tiles_x; + + const auto chunk_idx = + group_offset + tile_idx_y * n_tiles_x + tile_idx_x; + const auto chunk_start = get_chunk_data_(chunk_idx); + auto chunk_pos = chunk_offset; + + for (auto k = 0; k < tile_rows; ++k) { + const auto frame_row = tile_idx_y * tile_rows + k; + if (frame_row < frame_rows) { + const auto frame_col = tile_idx_x * tile_cols; + + const auto region_width = + std::min(frame_col + tile_cols, frame_cols) - frame_col; + + const auto region_start = + bytes_per_px * (frame_row * frame_cols + frame_col); + const auto nbytes = region_width * bytes_per_px; + + // copy region + EXPECT(region_start + nbytes <= data_size, + "Buffer overflow in framme. Region start: ", + region_start, + " nbytes: ", + nbytes, + " data size: ", + data_size); + EXPECT(chunk_pos + nbytes <= bytes_per_chunk, + "Buffer overflow in chunk. Chunk pos: ", + chunk_pos, + " nbytes: ", + nbytes, + " bytes per chunk: ", + bytes_per_chunk); + memcpy( + chunk_start + chunk_pos, data_ptr + region_start, nbytes); + bytes_written += nbytes; } + chunk_pos += bytes_per_row; } } diff --git a/src/streaming/file.sink.cpp b/src/streaming/file.sink.cpp index e4d296e8..d553f924 100644 --- a/src/streaming/file.sink.cpp +++ b/src/streaming/file.sink.cpp @@ -1,33 +1,42 @@ #include "file.sink.hh" #include "macros.hh" -#include +#include -namespace fs = std::filesystem; +void +init_handle(void**, std::string_view); -zarr::FileSink::FileSink(std::string_view filename, bool truncate) - : file_(filename.data(), - truncate ? (std::ios::binary | std::ios::trunc) : std::ios::binary) +void +destroy_handle(void**); +bool +seek_and_write(void**, size_t, ConstByteSpan); + +bool +flush_file(void**); + +zarr::FileSink::FileSink(std::string_view filename) { - EXPECT(file_.is_open(), "Failed to open file ", filename); + init_handle(&handle_, filename); +} + +zarr::FileSink::~FileSink() +{ + destroy_handle(&handle_); } bool -zarr::FileSink::write(size_t offset, std::span data) +zarr::FileSink::write(size_t offset, ConstByteSpan data) { const auto bytes_of_buf = data.size(); if (data.data() == nullptr || bytes_of_buf == 0) { return true; } - file_.seekp(offset); - file_.write(reinterpret_cast(data.data()), bytes_of_buf); - return true; + return seek_and_write(&handle_, offset, data); } bool zarr::FileSink::flush_() { - file_.flush(); - return true; -} + return flush_file(&handle_); +} \ No newline at end of file diff --git a/src/streaming/file.sink.hh b/src/streaming/file.sink.hh index f54d7868..90601598 100644 --- a/src/streaming/file.sink.hh +++ b/src/streaming/file.sink.hh @@ -9,14 +9,19 @@ namespace zarr { class FileSink : public Sink { public: - FileSink(std::string_view filename, bool truncate = true); + FileSink(std::string_view filename); + ~FileSink() override; - bool write(size_t offset, std::span data) override; + bool write(size_t offset, ConstByteSpan data) override; protected: bool flush_() override; private: - std::ofstream file_; + std::mutex mutex_; + size_t page_size_; + size_t sector_size_; + + void* handle_; }; } // namespace zarr diff --git a/src/streaming/posix/file.sink.impl.cpp b/src/streaming/posix/file.sink.impl.cpp new file mode 100644 index 00000000..3eb41a1b --- /dev/null +++ b/src/streaming/posix/file.sink.impl.cpp @@ -0,0 +1,83 @@ +#include "definitions.hh" +#include "macros.hh" + +#include + +#include +#include +#include +#include + +std::string +get_last_error_as_string() +{ + return strerror(errno); +} + +void +init_handle(void** handle, std::string_view filename) +{ + EXPECT(handle, "Expected nonnull pointer file handle."); + auto* fd = new int; + + *fd = open(filename.data(), O_WRONLY | O_CREAT, 0644); + if (*fd < 0) { + const auto err = get_last_error_as_string(); + delete fd; + throw std::runtime_error("Failed to open file: '" + + std::string(filename) + "': " + err); + } + *handle = (void*)fd; +} + +bool +seek_and_write(void** handle, size_t offset, ConstByteSpan data) +{ + CHECK(handle); + auto* fd = reinterpret_cast(*handle); + + auto* cur = reinterpret_cast(data.data()); + auto* end = cur + data.size(); + + int retries = 0; + const auto max_retries = 3; + while (cur < end && retries < max_retries) { + size_t remaining = end - cur; + ssize_t written = pwrite(*fd, cur, remaining, offset); + if (written < 0) { + const auto err = get_last_error_as_string(); + throw std::runtime_error("Failed to write to file: " + err); + } + retries += (written == 0) ? 1 : 0; + offset += written; + cur += written; + } + + return (retries < max_retries); +} + +bool +flush_file(void** handle) +{ + CHECK(handle); + auto* fd = reinterpret_cast(*handle); + + const auto res = fsync(*fd); + if (res < 0) { + LOG_ERROR("Failed to flush file: ", get_last_error_as_string()); + } + + return res == 0; +} + +void +destroy_handle(void** handle) +{ + auto* fd = reinterpret_cast(*handle); + if (fd) { + if (*fd >= 0) { + close(*fd); + } + delete fd; + } +} \ No newline at end of file diff --git a/src/streaming/s3.sink.cpp b/src/streaming/s3.sink.cpp index 849236f6..14e4d527 100644 --- a/src/streaming/s3.sink.cpp +++ b/src/streaming/s3.sink.cpp @@ -51,7 +51,7 @@ zarr::S3Sink::flush_() } bool -zarr::S3Sink::write(size_t offset, std::span data) +zarr::S3Sink::write(size_t offset, ConstByteSpan data) { if (data.data() == nullptr || data.empty()) { return true; diff --git a/src/streaming/s3.sink.hh b/src/streaming/s3.sink.hh index 49d22194..e3283854 100644 --- a/src/streaming/s3.sink.hh +++ b/src/streaming/s3.sink.hh @@ -17,7 +17,7 @@ class S3Sink : public Sink std::string_view object_key, std::shared_ptr connection_pool); - bool write(size_t offset, std::span data) override; + bool write(size_t offset, ConstByteSpan data) override; protected: bool flush_() override; diff --git a/src/streaming/sink.cpp b/src/streaming/sink.cpp index 8352a350..2f2031e1 100644 --- a/src/streaming/sink.cpp +++ b/src/streaming/sink.cpp @@ -81,7 +81,7 @@ make_file_sinks(std::vector& file_paths, try { if (all_successful) { - *psink = std::make_unique(filename, true); + *psink = std::make_unique(filename); } success = true; } catch (const std::exception& exc) { @@ -146,7 +146,7 @@ make_file_sinks( try { if (all_successful) { *psink = - std::make_unique(filename, true); + std::make_unique(filename); } success = true; } catch (const std::exception& exc) { @@ -352,7 +352,7 @@ zarr::make_dirs(const std::vector& dir_paths, } std::unique_ptr -zarr::make_file_sink(std::string_view file_path, bool truncate) +zarr::make_file_sink(std::string_view file_path) { if (file_path.starts_with("file://")) { file_path = file_path.substr(7); @@ -374,7 +374,7 @@ zarr::make_file_sink(std::string_view file_path, bool truncate) } } - return std::make_unique(file_path, truncate); + return std::make_unique(file_path); } bool diff --git a/src/streaming/sink.hh b/src/streaming/sink.hh index 7a4226fd..746b5cca 100644 --- a/src/streaming/sink.hh +++ b/src/streaming/sink.hh @@ -1,5 +1,6 @@ #pragma once +#include "definitions.hh" #include "s3.connection.hh" #include "thread.pool.hh" #include "zarr.dimension.hh" @@ -17,12 +18,11 @@ class Sink /** * @brief Write data to the sink. * @param offset The offset in the sink to write to. - * @param buf The buffer to write to the sink. + * @param data The buffer to write to the sink. * @param bytes_of_buf The number of bytes to write from @p buf. * @return True if the write was successful, false otherwise. */ - [[nodiscard]] virtual bool write(size_t offset, - std::span buf) = 0; + [[nodiscard]] virtual bool write(size_t offset, ConstByteSpan data) = 0; protected: [[nodiscard]] virtual bool flush_() = 0; @@ -76,13 +76,12 @@ make_dirs(const std::vector& dir_paths, /** * @brief Create a file sink from a path. * @param file_path The path to the file. - * @param truncate If true, the file is truncated to zero length. * @return Pointer to the sink created, or nullptr if the file cannot be * opened. * @throws std::runtime_error if the file path is not valid. */ -std::unique_ptr -make_file_sink(std::string_view file_path, bool truncate); +std::unique_ptr +make_file_sink(std::string_view file_path); /** * @brief Create a collection of file sinks for a Zarr dataset. diff --git a/src/streaming/thread.pool.cpp b/src/streaming/thread.pool.cpp index aede31de..c20072e4 100644 --- a/src/streaming/thread.pool.cpp +++ b/src/streaming/thread.pool.cpp @@ -29,12 +29,12 @@ bool zarr::ThreadPool::push_job(Task&& job) { std::unique_lock lock(jobs_mutex_); - if (!is_accepting_jobs_) { + if (!accepting_jobs) { return false; } jobs_.push(std::move(job)); - cv_.notify_one(); + jobs_cv_.notify_one(); return true; } @@ -44,9 +44,9 @@ zarr::ThreadPool::await_stop() noexcept { { std::scoped_lock lock(jobs_mutex_); - is_accepting_jobs_ = false; + accepting_jobs = false; - cv_.notify_all(); + jobs_cv_.notify_all(); } // spin down threads @@ -72,7 +72,7 @@ zarr::ThreadPool::pop_from_job_queue_() noexcept bool zarr::ThreadPool::should_stop_() const noexcept { - return !is_accepting_jobs_ && jobs_.empty(); + return !accepting_jobs && jobs_.empty(); } void @@ -80,7 +80,7 @@ zarr::ThreadPool::process_tasks_() { while (true) { std::unique_lock lock(jobs_mutex_); - cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); }); + jobs_cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); }); if (should_stop_()) { break; @@ -93,4 +93,10 @@ zarr::ThreadPool::process_tasks_() } } } +} + +uint32_t +zarr::ThreadPool::n_threads() const +{ + return threads_.size(); } \ No newline at end of file diff --git a/src/streaming/thread.pool.hh b/src/streaming/thread.pool.hh index c04c966b..98fa031f 100644 --- a/src/streaming/thread.pool.hh +++ b/src/streaming/thread.pool.hh @@ -39,15 +39,22 @@ class ThreadPool */ void await_stop() noexcept; + /** + * @brief Get the number of threads running. + * @return The number of threads running. + */ + uint32_t n_threads() const; + private: ErrorCallback error_handler_; std::vector threads_; + + std::atomic accepting_jobs{ true }; std::mutex jobs_mutex_; - std::condition_variable cv_; + std::condition_variable jobs_cv_; std::queue jobs_; - std::atomic is_accepting_jobs_{ true }; std::optional pop_from_job_queue_() noexcept; [[nodiscard]] bool should_stop_() const noexcept; diff --git a/src/streaming/win32/file.sink.impl.cpp b/src/streaming/win32/file.sink.impl.cpp new file mode 100644 index 00000000..c6b61f66 --- /dev/null +++ b/src/streaming/win32/file.sink.impl.cpp @@ -0,0 +1,118 @@ +#include "definitions.hh" +#include "macros.hh" + +#include + +#include + +std::string +get_last_error_as_string() +{ + auto error_message_id = ::GetLastError(); + if (error_message_id == 0) { + return std::string(); // No error message has been recorded + } + + LPSTR message_buffer = nullptr; + + const auto format = FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS; + const auto lang_id = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT); + size_t size = FormatMessageA(format, + nullptr, + error_message_id, + lang_id, + reinterpret_cast(&message_buffer), + 0, + nullptr); + + std::string message(message_buffer, size); + + LocalFree(message_buffer); + + return message; +} + +void +init_handle(void** handle, std::string_view filename) +{ + EXPECT(handle, "Expected nonnull pointer to file handle."); + auto* fd = new HANDLE; + + *fd = CreateFileA(filename.data(), + GENERIC_WRITE, + 0, // No sharing + nullptr, + OPEN_ALWAYS, + FILE_FLAG_OVERLAPPED, + nullptr); + + if (*fd == INVALID_HANDLE_VALUE) { + const auto err = get_last_error_as_string(); + delete fd; + throw std::runtime_error("Failed to open file: '" + + std::string(filename) + "': " + err); + } + *handle = reinterpret_cast(fd); +} + +bool +seek_and_write(void** handle, size_t offset, ConstByteSpan data) +{ + CHECK(handle); + auto* fd = reinterpret_cast(*handle); + + auto* cur = reinterpret_cast(data.data()); + auto* end = cur + data.size(); + + int retries = 0; + OVERLAPPED overlapped = { 0 }; + overlapped.hEvent = CreateEventA(nullptr, TRUE, FALSE, nullptr); + + const auto max_retries = 3; + while (cur < end && retries < max_retries) { + DWORD written = 0; + auto remaining = static_cast(end - cur); // may truncate + overlapped.Pointer = reinterpret_cast(offset); + if (!WriteFile(*fd, cur, remaining, nullptr, &overlapped) && + GetLastError() != ERROR_IO_PENDING) { + const auto err = get_last_error_as_string(); + LOG_ERROR("Failed to write to file: ", err); + CloseHandle(overlapped.hEvent); + return false; + } + + if (!GetOverlappedResult(*fd, &overlapped, &written, TRUE)) { + LOG_ERROR("Failed to get overlapped result: ", + get_last_error_as_string()); + CloseHandle(overlapped.hEvent); + return false; + } + retries += (written == 0) ? 1 : 0; + offset += written; + cur += written; + } + + CloseHandle(overlapped.hEvent); + return (retries < max_retries); +} + +bool +flush_file(void** handle) +{ + CHECK(handle); + return true; +} + +void +destroy_handle(void** handle) +{ + auto* fd = reinterpret_cast(*handle); + if (fd) { + if (*fd != INVALID_HANDLE_VALUE) { + CloseHandle(*fd); + } + delete fd; + } +} \ No newline at end of file diff --git a/src/streaming/zarr.dimension.cpp b/src/streaming/zarr.dimension.cpp index dcf4dec0..eaf00e54 100644 --- a/src/streaming/zarr.dimension.cpp +++ b/src/streaming/zarr.dimension.cpp @@ -6,8 +6,28 @@ ArrayDimensions::ArrayDimensions(std::vector&& dims, ZarrDataType dtype) : dims_(std::move(dims)) , dtype_(dtype) + , chunks_per_shard_(1) + , number_of_shards_(1) + , bytes_per_chunk_(zarr::bytes_of_type(dtype)) + , number_of_chunks_in_memory_(1) { EXPECT(dims_.size() > 2, "Array must have at least three dimensions."); + + for (auto i = 0; i < dims_.size(); ++i) { + const auto& dim = dims_[i]; + bytes_per_chunk_ *= dim.chunk_size_px; + chunks_per_shard_ *= dim.shard_size_chunks; + + if (i > 0) { + number_of_chunks_in_memory_ *= zarr::chunks_along_dimension(dim); + number_of_shards_ *= zarr::shards_along_dimension(dim); + } + } + + for (auto i = 0; i < chunks_per_shard_ * number_of_shards_; ++i) { + shard_indices_.insert_or_assign(i, shard_index_for_chunk_(i)); + shard_internal_indices_.insert_or_assign(i, shard_internal_index_(i)); + } } size_t @@ -127,46 +147,25 @@ ArrayDimensions::chunk_internal_offset(uint64_t frame_id) const uint32_t ArrayDimensions::number_of_chunks_in_memory() const { - uint32_t n_chunks = 1; - for (auto i = 1; i < ndims(); ++i) { - n_chunks *= zarr::chunks_along_dimension(dims_[i]); - } - - return n_chunks; + return number_of_chunks_in_memory_; } size_t ArrayDimensions::bytes_per_chunk() const { - auto n_bytes = zarr::bytes_of_type(dtype_); - for (const auto& d : dims_) { - n_bytes *= d.chunk_size_px; - } - - return n_bytes; + return bytes_per_chunk_; } uint32_t ArrayDimensions::number_of_shards() const { - size_t n_shards = 1; - for (auto i = 1; i < ndims(); ++i) { - const auto& dim = dims_[i]; - n_shards *= zarr::shards_along_dimension(dim); - } - - return n_shards; + return number_of_shards_; } uint32_t ArrayDimensions::chunks_per_shard() const { - size_t n_chunks = 1; - for (const auto& dim : dims_) { - n_chunks *= dim.shard_size_chunks; - } - - return n_chunks; + return chunks_per_shard_; } uint32_t @@ -177,6 +176,18 @@ ArrayDimensions::chunk_layers_per_shard() const uint32_t ArrayDimensions::shard_index_for_chunk(uint32_t chunk_index) const +{ + return shard_indices_.at(chunk_index); +} + +uint32_t +ArrayDimensions::shard_internal_index(uint32_t chunk_index) const +{ + return shard_internal_indices_.at(chunk_index); +} + +uint32_t +ArrayDimensions::shard_index_for_chunk_(uint32_t chunk_index) const { // make chunk strides std::vector chunk_strides; @@ -200,7 +211,8 @@ ArrayDimensions::shard_index_for_chunk(uint32_t chunk_index) const std::vector shard_strides(ndims(), 1); for (auto i = ndims() - 1; i > 0; --i) { const auto& dim = dims_[i]; - shard_strides[i-1] = shard_strides[i] * zarr::shards_along_dimension(dim); + shard_strides[i - 1] = + shard_strides[i] * zarr::shards_along_dimension(dim); } std::vector shard_lattice_indices; @@ -218,7 +230,7 @@ ArrayDimensions::shard_index_for_chunk(uint32_t chunk_index) const } uint32_t -ArrayDimensions::shard_internal_index(uint32_t chunk_index) const +ArrayDimensions::shard_internal_index_(uint32_t chunk_index) const { // make chunk strides std::vector chunk_strides; @@ -261,4 +273,4 @@ ArrayDimensions::shard_internal_index(uint32_t chunk_index) const } return index; -} \ No newline at end of file +} diff --git a/src/streaming/zarr.dimension.hh b/src/streaming/zarr.dimension.hh index 59cd3011..6d6db0a0 100644 --- a/src/streaming/zarr.dimension.hh +++ b/src/streaming/zarr.dimension.hh @@ -5,6 +5,7 @@ #include #include #include +#include #include struct ZarrDimension @@ -120,6 +121,18 @@ class ArrayDimensions private: std::vector dims_; ZarrDataType dtype_; + + size_t bytes_per_chunk_; + + uint32_t number_of_chunks_in_memory_; + uint32_t chunks_per_shard_; + uint32_t number_of_shards_; + + std::unordered_map shard_indices_; + std::unordered_map shard_internal_indices_; + + uint32_t shard_index_for_chunk_(uint32_t chunk_index) const; + uint32_t shard_internal_index_(uint32_t chunk_index) const; }; using DimensionPartsFun = std::function; \ No newline at end of file diff --git a/src/streaming/zarr.stream.cpp b/src/streaming/zarr.stream.cpp index eb0b5ba2..40dd32dd 100644 --- a/src/streaming/zarr.stream.cpp +++ b/src/streaming/zarr.stream.cpp @@ -7,6 +7,7 @@ #include "sink.hh" #include +#include #include @@ -14,6 +15,10 @@ #undef min #endif +#ifdef max +#undef max +#endif + namespace fs = std::filesystem; namespace { @@ -373,22 +378,7 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings) commit_settings_(settings); - // spin up thread pool - unsigned int max_threads = settings->max_threads; - const auto hardware_concurrency = std::thread::hardware_concurrency(); - - if (max_threads == 0) { - if (hardware_concurrency > 0) { - LOG_DEBUG("Using ", hardware_concurrency, " threads"); - max_threads = hardware_concurrency; - } else { - LOG_WARNING( - "Unable to determine hardware concurrency, using 1 thread"); - max_threads = 1; - } - } - thread_pool_ = std::make_shared( - max_threads, [this](const std::string& err) { this->set_error_(err); }); + start_thread_pool_(settings->max_threads); // allocate a frame buffer frame_buffer_.resize( @@ -531,6 +521,20 @@ ZarrStream_s::commit_settings_(const struct ZarrStreamSettings_s* settings) multiscale_ = settings->multiscale; } +void +ZarrStream_s::start_thread_pool_(uint32_t max_threads) +{ + max_threads = + max_threads == 0 ? std::thread::hardware_concurrency() : max_threads; + if (max_threads == 0) { + LOG_WARNING("Unable to determine hardware concurrency, using 1 thread"); + max_threads = 1; + } + + thread_pool_ = std::make_shared( + max_threads, [this](const std::string& err) { this->set_error_(err); }); +} + void ZarrStream_s::set_error_(const std::string& msg) { @@ -791,7 +795,7 @@ ZarrStream_s::write_external_metadata_() s3_settings_->bucket_name, sink_path, s3_connection_pool_)); } else { metadata_sinks_.emplace(metadata_key, - zarr::make_file_sink(sink_path, true)); + zarr::make_file_sink(sink_path)); } } diff --git a/src/streaming/zarr.stream.hh b/src/streaming/zarr.stream.hh index 4c473cbc..71c8b08e 100644 --- a/src/streaming/zarr.stream.hh +++ b/src/streaming/zarr.stream.hh @@ -68,6 +68,11 @@ struct ZarrStream_s */ void commit_settings_(const struct ZarrStreamSettings_s* settings); + /** + * @brief Spin up the thread pool. + */ + void start_thread_pool_(uint32_t max_threads); + /** * @brief Set an error message. * @param msg The error message to set. diff --git a/src/streaming/zarrv2.array.writer.cpp b/src/streaming/zarrv2.array.writer.cpp index 66d3dfc9..4285a2e9 100644 --- a/src/streaming/zarrv2.array.writer.cpp +++ b/src/streaming/zarrv2.array.writer.cpp @@ -190,7 +190,7 @@ zarr::ZarrV2ArrayWriter::compress_and_flush_data_() sink = make_s3_sink( *bucket_name, data_path, connection_pool); } else { - sink = make_file_sink(data_path, true); + sink = make_file_sink(data_path); } // write the chunk to the sink diff --git a/src/streaming/zarrv3.array.writer.cpp b/src/streaming/zarrv3.array.writer.cpp index e1cf2a45..86dfe434 100644 --- a/src/streaming/zarrv3.array.writer.cpp +++ b/src/streaming/zarrv3.array.writer.cpp @@ -354,8 +354,7 @@ zarr::ZarrV3ArrayWriter::compress_and_flush_data_() sink = make_s3_sink( *bucket_name, data_path, connection_pool); } else { - sink = - make_file_sink(data_path, *file_offset == 0); + sink = make_file_sink(data_path); } std::span shard_data(shard_ptr, shard_size); diff --git a/tests/unit-tests/common-construct-data-paths.cpp b/tests/unit-tests/common-construct-data-paths.cpp deleted file mode 100644 index b981d9b8..00000000 --- a/tests/unit-tests/common-construct-data-paths.cpp +++ /dev/null @@ -1,64 +0,0 @@ -#include "unit.test.macros.hh" -#include "zarr.common.hh" - -#include -#include -#include - -namespace { -auto -create_parts_fun(size_t parts) -{ - return [parts](const ZarrDimension&) { return parts; }; -} -} // namespace - -int -main() -{ - int retval = 1; - - try { - std::vector dims{ - { "time", ZarrDimensionType_Time, 50, 16, 2 }, - { "height", ZarrDimensionType_Space, 100, 32, 2 }, - { "width", ZarrDimensionType_Space, 100, 32, 2 } - }; - ArrayDimensions dimensions(std::move(dims), ZarrDataType_uint8); - { - const auto parts_fun = create_parts_fun(2); - const auto paths = - zarr::construct_data_paths("", dimensions, parts_fun); - - EXPECT_EQ(int, paths.size(), 4); - EXPECT_STR_EQ(paths[0].c_str(), "0/0"); - EXPECT_STR_EQ(paths[1].c_str(), "0/1"); - EXPECT_STR_EQ(paths[2].c_str(), "1/0"); - EXPECT_STR_EQ(paths[3].c_str(), "1/1"); - } - - { - const auto parts_fun = create_parts_fun(3); - const auto paths = - zarr::construct_data_paths("", dimensions, parts_fun); - - EXPECT_EQ(int, paths.size(), 9); - EXPECT_STR_EQ(paths[0].c_str(), "0/0"); - EXPECT_STR_EQ(paths[1].c_str(), "0/1"); - EXPECT_STR_EQ(paths[2].c_str(), "0/2"); - EXPECT_STR_EQ(paths[3].c_str(), "1/0"); - EXPECT_STR_EQ(paths[4].c_str(), "1/1"); - EXPECT_STR_EQ(paths[5].c_str(), "1/2"); - EXPECT_STR_EQ(paths[6].c_str(), "2/0"); - EXPECT_STR_EQ(paths[7].c_str(), "2/1"); - EXPECT_STR_EQ(paths[8].c_str(), "2/2"); - } - - retval = 0; - } catch (const std::exception& e) { - LOG_ERROR("Test failed: ", e.what()); - throw; - } - - return retval; -} diff --git a/tests/unit-tests/common-make-dirs.cpp b/tests/unit-tests/common-make-dirs.cpp deleted file mode 100644 index 4e0c1253..00000000 --- a/tests/unit-tests/common-make-dirs.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include "unit.test.macros.hh" -#include "zarr.common.hh" - -#include - -namespace fs = std::filesystem; - -int -main() -{ - int retval = 1; - auto temp_dir = fs::temp_directory_path() / TEST; - - auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), - [](const std::string& err) { LOG_ERROR("Error: ", err); }); - - std::vector dir_paths = { (temp_dir / "a").string(), - (temp_dir / "b/c").string(), - (temp_dir / "d/e/f").string() }; - - try { - for (const auto& dir_path : dir_paths) { - EXPECT( - !fs::exists(dir_path), "Directory ", dir_path, " already exists"); - } - - EXPECT(zarr::make_dirs(dir_paths, thread_pool), - "Failed to create dirs."); - for (const auto& dir_path : dir_paths) { - EXPECT(fs::is_directory(temp_dir / dir_path), - "Failed to create directory ", - dir_path); - } - retval = 0; - } catch (const std::exception& exc) { - LOG_ERROR("Exception: ", exc.what()); - } - - // cleanup - if (fs::exists(temp_dir)) { - fs::remove_all(temp_dir); - } - - return retval; -} diff --git a/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp b/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp index 245c35fc..324b7f6f 100644 --- a/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp +++ b/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp @@ -67,7 +67,7 @@ main() try { auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), [](const std::string& err) { + 1, [](const std::string& err) { LOG_ERROR("Error: ", err); });